Skip to content

Commit

Permalink
Store FSRandomAccessPtr object in RandomAccessFileReader (facebook#7192)
Browse files Browse the repository at this point in the history
Summary:
Replace FSRandomAccessFile pointer with FSRandomAccessFilePtr
    object in RandomAccessFileReader.
    This new object wraps FSRandomAccessFile pointer.

    Objective: If tracing is enabled, FSRandomAccessFile Ptr returns
    FSRandomAccessFileTracingWrapper pointer that includes all necessary
    information in IORecord and calls underlying FileSystem and invokes
    IOTracer to dump that record in a binary file. If tracing is disabled
    then, underlying FileSystem pointer is returned directly.
    FSRandomAccessFilePtr wrapper class is added to bypass the FSRandomAccessFileWrapper when
    tracing is disabled.

    Test Plan: make check -j64

Pull Request resolved: facebook#7192

Reviewed By: anand1976

Differential Revision: D23356867

Pulled By: akankshamahajan15

fbshipit-source-id: 48f31168166a17a7444b40be44a9a9d4a5c7182c
  • Loading branch information
akankshamahajan15 authored and facebook-github-bot committed Aug 27, 2020
1 parent 9aad24d commit 8e0df90
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 77 deletions.
15 changes: 9 additions & 6 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ ColumnFamilyData::ColumnFamilyData(
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
const FileOptions& file_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer)
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
Expand Down Expand Up @@ -557,7 +558,7 @@ ColumnFamilyData::ColumnFamilyData(
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, db_options.env, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer));
block_cache_tracer, io_tracer));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
Expand Down Expand Up @@ -1415,20 +1416,22 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
Cache* table_cache,
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer)
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(
ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
block_cache_tracer)),
block_cache_tracer, io_tracer)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
file_options_(file_options),
table_cache_(table_cache),
write_buffer_manager_(_write_buffer_manager),
write_controller_(_write_controller),
block_cache_tracer_(block_cache_tracer) {
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
Expand Down Expand Up @@ -1494,7 +1497,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, file_options_, this, block_cache_tracer_);
*db_options_, file_options_, this, block_cache_tracer_, io_tracer_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
Expand Down
7 changes: 5 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ class ColumnFamilyData {
const ImmutableDBOptions& db_options,
const FileOptions& file_options,
ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer);
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);

std::vector<std::string> GetDbPaths() const;

Expand Down Expand Up @@ -651,7 +652,8 @@ class ColumnFamilySet {
const FileOptions& file_options, Cache* table_cache,
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer);
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
~ColumnFamilySet();

ColumnFamilyData* GetDefault() const;
Expand Down Expand Up @@ -715,6 +717,7 @@ class ColumnFamilySet {
WriteBufferManager* write_buffer_manager_;
WriteController* write_controller_;
BlockCacheTracer* const block_cache_tracer_;
std::shared_ptr<IOTracer> io_tracer_;
};

// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
Expand Down
8 changes: 4 additions & 4 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ Status ExternalSstFileIngestionJob::Prepare(
db_options_.file_checksum_gen_factory.get(), &generated_checksum,
&generated_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads);
db_options_.allow_mmap_reads, io_tracer_);
if (!io_s.ok()) {
status = io_s;
ROCKS_LOG_WARN(db_options_.info_log,
Expand Down Expand Up @@ -509,8 +509,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
if (!status.ok()) {
return status;
}
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
external_file));
sst_file_reader.reset(new RandomAccessFileReader(
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));

status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
Expand Down Expand Up @@ -835,7 +835,7 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
db_options_.file_checksum_gen_factory.get(), &file_checksum,
&file_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads);
db_options_.allow_mmap_reads, io_tracer_);
if (!io_s.ok()) {
return io_s;
}
Expand Down
4 changes: 2 additions & 2 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
if (!status.ok()) {
return status;
}
sst_file_reader.reset(
new RandomAccessFileReader(std::move(sst_file), external_file));
sst_file_reader.reset(new RandomAccessFileReader(
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));

status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
Expand Down
6 changes: 3 additions & 3 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ class Repairer {
// TableCache can be small since we expect each table to be opened
// once.
NewLRUCache(10, db_options_.table_cache_numshardbits)),
table_cache_(new TableCache(default_cf_iopts_, env_options_,
raw_table_cache_.get(),
/*block_cache_tracer=*/nullptr)),
table_cache_(new TableCache(
default_cf_iopts_, env_options_, raw_table_cache_.get(),
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
wb_(db_options_.db_write_buffer_size),
wc_(db_options_.delayed_write_rate),
vset_(dbname_, &immutable_db_options_, env_options_,
Expand Down
8 changes: 5 additions & 3 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ const int kLoadConcurency = 128;

TableCache::TableCache(const ImmutableCFOptions& ioptions,
const FileOptions& file_options, Cache* const cache,
BlockCacheTracer* const block_cache_tracer)
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
: ioptions_(ioptions),
file_options_(file_options),
cache_(cache),
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, GetSliceNPHash64) {
loader_mutex_(kLoadConcurency, GetSliceNPHash64),
io_tracer_(io_tracer) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
Expand Down Expand Up @@ -126,7 +128,7 @@ Status TableCache::GetTableReader(
StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.env,
std::move(file), fname, ioptions_.env, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
Expand Down
4 changes: 3 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class TableCache {
public:
TableCache(const ImmutableCFOptions& ioptions,
const FileOptions& storage_options, Cache* cache,
BlockCacheTracer* const block_cache_tracer);
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
~TableCache();

// Return an iterator for the specified file number (the corresponding
Expand Down Expand Up @@ -226,6 +227,7 @@ class TableCache {
bool immortal_tables_;
BlockCacheTracer* const block_cache_tracer_;
Striped<port::Mutex, Slice> loader_mutex_;
std::shared_ptr<IOTracer> io_tracer_;
};

} // namespace ROCKSDB_NAMESPACE
17 changes: 9 additions & 8 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1284,8 +1284,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
// pass the magic number check in the footer.
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
0 /* hist_type */, nullptr /* file_read_hist */,
std::move(file), file_name, nullptr /* env */, nullptr /* IOTracer */,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, ioptions->listeners));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Expand Down Expand Up @@ -3618,9 +3618,10 @@ VersionSet::VersionSet(const std::string& dbname,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer)),
: column_family_set_(
new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller,
block_cache_tracer, io_tracer)),
env_(_db_options->env),
fs_(_db_options->fs, io_tracer),
dbname_(dbname),
Expand Down Expand Up @@ -3660,9 +3661,9 @@ void VersionSet::Reset() {
Cache* table_cache = column_family_set_->get_table_cache();
WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
WriteController* wc = column_family_set_->write_controller();
column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_,
file_options_, table_cache,
wbm, wc, block_cache_tracer_));
column_family_set_.reset(
new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache,
wbm, wc, block_cache_tracer_, io_tracer_));
}
db_id_.clear();
next_file_number_.store(2);
Expand Down
41 changes: 23 additions & 18 deletions env/file_system_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,24 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
// FSSequentialFileTracingWrapper when tracing is disabled.
class FSSequentialFilePtr {
public:
FSSequentialFilePtr() {}
FSSequentialFilePtr() = delete;
FSSequentialFilePtr(std::unique_ptr<FSSequentialFile>&& fs,
const std::shared_ptr<IOTracer>& io_tracer)
: fs_(std::move(fs)), io_tracer_(io_tracer) {
fs_tracer_.reset(new FSSequentialFileTracingWrapper(fs_.get(), io_tracer_));
}
: fs_(std::move(fs)),
io_tracer_(io_tracer),
fs_tracer_(fs_.get(), io_tracer_) {}

FSSequentialFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_.get();
return const_cast<FSSequentialFileTracingWrapper*>(&fs_tracer_);
} else {
return fs_.get();
}
}

FSSequentialFile* get() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_.get();
return const_cast<FSSequentialFileTracingWrapper*>(&fs_tracer_);
} else {
return fs_.get();
}
Expand All @@ -157,7 +157,7 @@ class FSSequentialFilePtr {
private:
std::unique_ptr<FSSequentialFile> fs_;
std::shared_ptr<IOTracer> io_tracer_;
std::unique_ptr<FSSequentialFileTracingWrapper> fs_tracer_;
FSSequentialFileTracingWrapper fs_tracer_;
};

// FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile
Expand Down Expand Up @@ -200,27 +200,32 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
// FSRandomAccessFileTracingWrapper when tracing is disabled.
class FSRandomAccessFilePtr {
public:
FSRandomAccessFilePtr(FSRandomAccessFile* fs,
std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
FSRandomAccessFilePtr(std::unique_ptr<FSRandomAccessFile>&& fs,
const std::shared_ptr<IOTracer>& io_tracer)
: fs_(std::move(fs)),
io_tracer_(io_tracer),
fs_tracer_(new FSRandomAccessFileTracingWrapper(fs_, io_tracer_)) {}

explicit FSRandomAccessFilePtr(FSRandomAccessFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
fs_tracer_(fs_.get(), io_tracer_) {}

FSRandomAccessFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
return const_cast<FSRandomAccessFileTracingWrapper*>(&fs_tracer_);
} else {
return fs_;
return fs_.get();
}
}

FSRandomAccessFile* get() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return const_cast<FSRandomAccessFileTracingWrapper*>(&fs_tracer_);
} else {
return fs_.get();
}
}

private:
FSRandomAccessFile* fs_;
std::unique_ptr<FSRandomAccessFile> fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSRandomAccessFileTracingWrapper* fs_tracer_;
FSRandomAccessFileTracingWrapper fs_tracer_;
};

// FSWritableFileTracingWrapper is a wrapper class above FSWritableFile that
Expand Down
6 changes: 4 additions & 2 deletions file/file_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path,
std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size,
bool allow_mmap_reads) {
bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer) {
if (checksum_factory == nullptr) {
return IOStatus::InvalidArgument("Checksum factory is invalid");
}
Expand All @@ -151,7 +152,8 @@ IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path,
if (!io_s.ok()) {
return io_s;
}
reader.reset(new RandomAccessFileReader(std::move(r_file), file_path));
reader.reset(new RandomAccessFileReader(std::move(r_file), file_path,
nullptr /*Env*/, io_tracer));
}

// Found that 256 KB readahead size provides the best performance, based on
Expand Down
3 changes: 2 additions & 1 deletion file/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ extern IOStatus GenerateOneFileChecksum(
FileSystem* fs, const std::string& file_path,
FileChecksumGenFactory* checksum_factory, std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads);
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer);

inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env,
IOOptions& opts) {
Expand Down
23 changes: 5 additions & 18 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <sstream>
#include <string>

#include "env/file_system_tracer.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
Expand Down Expand Up @@ -64,7 +65,7 @@ class RandomAccessFileReader {

bool ShouldNotifyListeners() const { return !listeners_.empty(); }

std::unique_ptr<FSRandomAccessFile> file_;
FSRandomAccessFilePtr file_;
std::string file_name_;
Env* env_;
Statistics* stats_;
Expand All @@ -76,11 +77,12 @@ class RandomAccessFileReader {
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
Env* _env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
Env* _env = nullptr, const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
: file_(std::move(raf), io_tracer),
file_name_(std::move(_file_name)),
env_(_env),
stats_(stats),
Expand All @@ -100,21 +102,6 @@ class RandomAccessFileReader {
#endif
}

RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}

RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
rate_limiter_ = std::move(o.rate_limiter_);
return *this;
}

RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;

Expand Down
Loading

0 comments on commit 8e0df90

Please sign in to comment.