Skip to content

Commit

Permalink
Sort per-file blob read requests by offset (facebook#8953)
Browse files Browse the repository at this point in the history
Summary:
`RandomAccessFileReader::MultiRead()` tries to merge requests in direct IO, assuming input IO requests are
sorted by offsets.

Add a test in direct IO mode.

Pull Request resolved: facebook#8953

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D31183546

Pulled By: riversand963

fbshipit-source-id: 5d043ec68e2daa47a3149066150afd41ee3d73e6
  • Loading branch information
riversand963 authored and facebook-github-bot committed Sep 25, 2021
1 parent 6d424be commit b92cef2
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 7 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fixes a bug in directed IO mode when calling MultiGet() for blobs in the same blob file. The bug is caused by not sorting the blob read requests by file offsets.
### New Features
### Public API change
* Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
Expand Down
191 changes: 191 additions & 0 deletions db/blob/db_blob_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,197 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) {
}
}

#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
Options options = GetDefaultOptions();

// First, create an external SST file ["b"].
const std::string file_path = dbname_ + "/test.sst";
{
SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions());
Status s = sst_file_writer.Open(file_path);
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Put("b", "b_value"));
ASSERT_OK(sst_file_writer.Finish());
}

options.enable_blob_files = true;
options.min_blob_size = 1000;
options.use_direct_reads = true;
options.allow_ingest_behind = true;

// Open DB with fixed-prefix sst-partitioner so that compaction will cut
// new table file when encountering a new key whose 1-byte prefix changes.
constexpr size_t key_len = 1;
options.sst_partitioner_factory =
NewSstPartitionerFixedPrefixFactory(key_len);

Status s = TryReopen(options);
if (s.IsInvalidArgument()) {
ROCKSDB_GTEST_SKIP("This test requires direct IO support");
return;
}
ASSERT_OK(s);

constexpr size_t num_keys = 3;
constexpr size_t blob_size = 3000;

constexpr char first_key[] = "a";
const std::string first_blob(blob_size, 'a');
ASSERT_OK(Put(first_key, first_blob));

constexpr char second_key[] = "b";
const std::string second_blob(2 * blob_size, 'b');
ASSERT_OK(Put(second_key, second_blob));

constexpr char third_key[] = "d";
const std::string third_blob(blob_size, 'd');
ASSERT_OK(Put(third_key, third_blob));

// first_blob, second_blob and third_blob in the same blob file.
// SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'|
// | | | ^ ^ ^
// | | | | | |
// | | +---------|-------|--------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());

constexpr char fourth_key[] = "c";
const std::string fourth_blob(blob_size, 'c');
ASSERT_OK(Put(fourth_key, fourth_blob));
// fourth_blob in another blob file.
// SST Blob file SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'|
// | | | ^ ^ ^ | ^
// | | | | | | | |
// | | +---------|-------|--------+ +-------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));

// Due to the above sst partitioner, we get 4 L1 files. The blob files are
// unchanged.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));

{
// Ingest the external SST file into bottommost level.
std::vector<std::string> ext_files{file_path};
IngestExternalFileOptions opts;
opts.ingest_behind = true;
ASSERT_OK(
db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts));
}

// Now the database becomes as follows.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
//
// L6 ["b"]

{
// Compact ["b"] to bottommost level.
Slice begin = Slice(second_key);
Slice end = Slice(second_key);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, &begin, &end));
}

// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["c"] | | ["d"] |
// | | | | |
// | +---------|-------|---------------+
// | +-----------------|-------+
// +-------|-----------------+
// |
// L6 ["b"]
ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1));
ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6));

bool called = false;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) {
auto* aligned_reqs = static_cast<std::vector<FSReadRequest>*>(arg);
assert(aligned_reqs);
ASSERT_EQ(1, aligned_reqs->size());
called = true;
});
SyncPoint::GetInstance()->EnableProcessing();

std::array<Slice, num_keys> keys{{first_key, third_key, second_key}};

{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;

// The MultiGet(), when constructing the KeyContexts, will process the keys
// in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1,
// while ["b"] resides in L6.
// Consequently, the original FSReadRequest list prepared by
// Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as
// follows:
//
// ["a", offset=30, len=3033],
// ["d", offset=9096, len=3033],
// ["b", offset=3063, len=6033]
//
// If we do not sort them before calling MultiRead() in DirectIO, then the
// underlying IO merging logic will yield two requests.
//
// [offset=0, len=4096] (for "a")
// [offset=0, len=12288] (result of merging the request for "d" and "b")
//
// We need to sort them in Version::MultiGetBlob() so that the underlying
// IO merging logic in DirectIO mode works as expected. The correct
// behavior will be one aligned request:
//
// [offset=0, len=12288]

db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

ASSERT_TRUE(called);

ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_blob);

ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], third_blob);

ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], second_blob);
}
}
#endif // !ROCKSDB_LITE

TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
Expand Down
11 changes: 9 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1866,7 +1866,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,

void Version::MultiGetBlob(
const ReadOptions& read_options, MultiGetRange& range,
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
if (read_options.read_tier == kBlockCacheTier) {
Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
for (const auto& elem : blob_rqs) {
Expand Down Expand Up @@ -1916,7 +1916,14 @@ void Version::MultiGetBlob(
const CompressionType compression =
blob_file_reader.GetValue()->GetCompressionType();

// TODO: sort blobs_in_file by file offset.
// sort blobs_in_file by file offset.
std::sort(
blobs_in_file.begin(), blobs_in_file.end(),
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
assert(lhs.first.file_number() == rhs.first.file_number());
return lhs.first.offset() < rhs.first.offset();
});

autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
autovector<std::reference_wrapper<const Slice>> user_keys;
autovector<uint64_t> offsets;
Expand Down
10 changes: 5 additions & 5 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,11 @@ class Version {
const BlobIndex& blob_index, PinnableSlice* value,
uint64_t* bytes_read) const;

using BlobReadRequests = std::vector<
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>>;
void MultiGetBlob(
const ReadOptions& read_options, MultiGetRange& range,
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);
using BlobReadRequest =
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>;
using BlobReadRequests = std::vector<BlobReadRequest>;
void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range,
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);

// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.
Expand Down

0 comments on commit b92cef2

Please sign in to comment.