Skip to content

Commit

Permalink
Add listener API that notifies on IOError (facebook#9177)
Browse files Browse the repository at this point in the history
Summary:
Add a new API in listener.h that notifies about IOErrors on
Read/Write/Append/Flush etc. The API reports about IOStatus, filename, Operation
name, offset and length.

Pull Request resolved: facebook#9177

Test Plan: Added new unit tests

Reviewed By: anand1976

Differential Revision: D32470627

Pulled By: akankshamahajan15

fbshipit-source-id: 189a717033590ae227b3beae8b1e7e185e4cdc12
  • Loading branch information
akankshamahajan15 authored and facebook-github-bot committed Nov 19, 2021
1 parent d949323 commit 4a7c1dc
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 2 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Added new option "adaptive_readahead" in ReadOptions. For iterators, RocksDB does auto-readahead on noticing sequential reads and by enabling this option, readahead_size of current file (if reads are sequential) will be carried forward to next file instead of starting from the scratch at each level (except L0 level files). If reads are not sequential it will fall back to 8KB. This option is applicable only for RocksDB internal prefetch buffer and isn't supported with underlying file system prefetching.
* Added the read count and read bytes related stats to Statistics for tiered storage hot, warm, and cold file reads.
* Added an option to dynamically charge an updating estimated memory usage of block-based table building to block cache if block cache available. It currently only includes charging memory usage of constructing (new) Bloom Filter and Ribbon Filter to block cache. To enable this feature, set `BlockBasedTableOptions::reserve_table_builder_memory = true`.
* Add a new API OnIOError in listener.h that notifies listeners when an IO error occurs during FileSystem operation along with filename, status etc.

### Bug Fixes
* Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption.
Expand Down
16 changes: 15 additions & 1 deletion file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
io_s);
if (!io_s.ok()) {
NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
tmp.size(), orig_offset);
}
}

buf.Size(buf.CurrentSize() + tmp.size());
Expand Down Expand Up @@ -245,9 +249,13 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
finish_ts, io_s);

if (!io_s.ok()) {
NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
tmp_result.size(), offset + pos);
}
}
#endif

if (res_scratch == nullptr) {
// we can't simply use `scratch` because reads of mmap'd files return
// data in a different buffer.
Expand Down Expand Up @@ -431,6 +439,12 @@ IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
start_ts, finish_ts, read_reqs[i].status);
}
if (!read_reqs[i].status.ok()) {
NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
file_name(), read_reqs[i].result.size(),
read_reqs[i].offset);
}

#endif // ROCKSDB_LITE
IOSTATS_ADD(bytes_read, read_reqs[i].result.size());
IOStatsAddBytesByTemperature(file_temperature_,
Expand Down
16 changes: 16 additions & 0 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,23 @@ class RandomAccessFileReader {
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
info.status.PermitUncheckedError();
}

void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
const std::string& file_path, size_t length,
uint64_t offset) const {
if (listeners_.empty()) {
return;
}
IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);

for (auto& listener : listeners_) {
listener->OnIOError(io_error_info);
}
io_status.PermitUncheckedError();
}

#endif // ROCKSDB_LITE

bool ShouldNotifyListeners() const { return !listeners_.empty(); }
Expand Down
1 change: 1 addition & 0 deletions file/sequence_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SequentialFileReader {
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
info.status.PermitUncheckedError();
}

void AddFileIOListeners(
Expand Down
38 changes: 38 additions & 0 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ IOStatus WritableFileWriter::Close() {
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
filesize_);
}
}
#endif
}
Expand All @@ -237,6 +241,9 @@ IOStatus WritableFileWriter::Close() {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileSyncFinish(start_ts, finish_ts, s,
FileOperationType::kFsync);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kFsync, file_name());
}
}
#endif
}
Expand All @@ -259,6 +266,9 @@ IOStatus WritableFileWriter::Close() {
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileCloseFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kClose, file_name());
}
}
#endif
}
Expand Down Expand Up @@ -318,6 +328,9 @@ IOStatus WritableFileWriter::Flush() {
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileFlushFinish(start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kFlush, file_name());
}
}
#endif
}
Expand Down Expand Up @@ -425,6 +438,11 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
NotifyOnFileSyncFinish(
start_ts, finish_ts, s,
use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
if (!s.ok()) {
NotifyOnIOError(
s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync),
file_name());
}
}
#endif
SetPerfLevel(prev_perf_level);
Expand All @@ -445,6 +463,10 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes,
offset);
}
}
#endif
return s;
Expand Down Expand Up @@ -500,6 +522,10 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed,
old_size);
}
}
#endif
if (!s.ok()) {
Expand Down Expand Up @@ -570,6 +596,10 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data,
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left,
old_size);
}
}
#endif
if (!s.ok()) {
Expand Down Expand Up @@ -671,6 +701,10 @@ IOStatus WritableFileWriter::WriteDirect() {
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
size, write_offset);
}
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
Expand Down Expand Up @@ -761,6 +795,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() {
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
if (!s.ok()) {
NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
left, write_offset);
}
}
if (!s.ok()) {
// In this case, we do not change buffered_data_crc32c_checksum_ because
Expand Down
13 changes: 13 additions & 0 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ class WritableFileWriter {
}
info.status.PermitUncheckedError();
}

void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
const std::string& file_path, size_t length = 0,
uint64_t offset = 0) {
if (listeners_.empty()) {
return;
}
IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);
for (auto& listener : listeners_) {
listener->OnIOError(io_error_info);
}
io_error_info.io_status.PermitUncheckedError();
}
#endif // ROCKSDB_LITE

bool ShouldNotifyListeners() const { return !listeners_.empty(); }
Expand Down
27 changes: 26 additions & 1 deletion include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/customizable.h"
#include "rocksdb/io_status.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
Expand Down Expand Up @@ -237,7 +238,10 @@ enum class FileOperationType {
kFlush,
kSync,
kFsync,
kRangeSync
kRangeSync,
kAppend,
kPositionedAppend,
kOpen
};

struct FileOperationInfo {
Expand Down Expand Up @@ -449,6 +453,23 @@ struct ExternalFileIngestionInfo {
TableProperties table_properties;
};

struct IOErrorInfo {
IOErrorInfo(const IOStatus& _io_status, FileOperationType _operation,
const std::string& _file_path, size_t _length, uint64_t _offset)
: io_status(_io_status),
operation(_operation),
file_path(_file_path),
length(_length),
offset(_offset) {}

IOStatus io_status;
FileOperationType operation;
std::string file_path;
size_t length;
uint64_t offset;
;
};

// EventListener class contains a set of callback functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
Expand Down Expand Up @@ -705,6 +726,10 @@ class EventListener : public Customizable {
// returned value.
virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {}

// A callback function for RocksDB which will be called whenever an IO error
// happens. ShouldBeNotifiedOnFileIO should be set to true to get a callback.
virtual void OnIOError(const IOErrorInfo& /*info*/) {}

virtual ~EventListener() {}
};

Expand Down
115 changes: 115 additions & 0 deletions util/file_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,121 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
}
}

#ifndef ROCKSDB_LITE
class IOErrorEventListener : public EventListener {
public:
IOErrorEventListener() { notify_error_.store(0); }

void OnIOError(const IOErrorInfo& io_error_info) override {
notify_error_++;
EXPECT_FALSE(io_error_info.file_path.empty());
EXPECT_FALSE(io_error_info.io_status.ok());
}

size_t NotifyErrorCount() { return notify_error_; }

bool ShouldBeNotifiedOnFileIO() override { return true; }

private:
std::atomic<size_t> notify_error_;
};

TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
class FakeWF : public FSWritableFile {
public:
explicit FakeWF() : io_error_(false) {
file_append_errors_.store(0);
file_flush_errors_.store(0);
}

using FSWritableFile::Append;
IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (io_error_) {
file_append_errors_++;
return IOStatus::IOError("Fake IO error");
}
return IOStatus::OK();
}

using FSWritableFile::PositionedAppend;
IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (io_error_) {
return IOStatus::IOError("Fake IO error");
}
return IOStatus::OK();
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (io_error_) {
file_flush_errors_++;
return IOStatus::IOError("Fake IO error");
}
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}

void SetIOError(bool val) { io_error_ = val; }

void CheckCounters(int file_append_errors, int file_flush_errors) {
ASSERT_EQ(file_append_errors, file_append_errors_);
ASSERT_EQ(file_flush_errors_, file_flush_errors);
}

protected:
bool io_error_;
std::atomic<size_t> file_append_errors_;
std::atomic<size_t> file_flush_errors_;
};

FileOptions file_options = FileOptions();
Options options = GetDefaultOptions();
options.create_if_missing = true;
IOErrorEventListener* listener = new IOErrorEventListener();
options.listeners.emplace_back(listener);

DestroyAndReopen(options);
ImmutableOptions ioptions(options);

std::string fname = this->dbname_ + "/test_file";
std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF);

std::unique_ptr<WritableFileWriter> file_writer;
writable_file_ptr->SetIOError(true);

file_writer.reset(new WritableFileWriter(
std::move(writable_file_ptr), fname, file_options,
SystemClock::Default().get(), nullptr, ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(), true, true));

FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file());

fwf->SetIOError(true);
ASSERT_NOK(file_writer->Append(std::string(2 * kMb, 'a')));
fwf->CheckCounters(1, 0);
ASSERT_EQ(listener->NotifyErrorCount(), 1);

fwf->SetIOError(true);
ASSERT_NOK(file_writer->Flush());
fwf->CheckCounters(1, 1);
ASSERT_EQ(listener->NotifyErrorCount(), 2);

/* No error generation */
fwf->SetIOError(false);
ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b')));
ASSERT_EQ(listener->NotifyErrorCount(), 2);
fwf->CheckCounters(1, 1);
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit 4a7c1dc

Please sign in to comment.