Skip to content

Commit

Permalink
Handle io_uring partial results (facebook#6441)
Browse files Browse the repository at this point in the history
Summary:
The logic that handles io_uring partial results was wrong. Fix the logic by putting it into a queue and continue reading.
Pull Request resolved: facebook#6441

Test Plan: Make sure this patch fixes the application test case where the bug was discovered; in env_test, add a unit test that simulates partial results and make sure the results are still correct.

Differential Revision: D20018616

fbshipit-source-id: 5398a7e34d74c26d52aa69dfd604e93e95d99c62
  • Loading branch information
siying authored and facebook-github-bot committed Feb 22, 2020
1 parent 890d87f commit 942eaba
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 27 deletions.
26 changes: 22 additions & 4 deletions env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,25 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
ASSERT_OK(wfile->Close());
}

// Random Read
{
// More attempts to simulate more partial result sequences.
for (uint32_t attempt = 0; attempt < 20; attempt++) {
// Random Read
Random rnd(301 + attempt);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) {
if (attempt > 0) {
// No failure in the first attempt.
size_t& bytes_read = *static_cast<size_t*>(arg);
if (rnd.OneIn(4)) {
bytes_read = 0;
} else if (rnd.OneIn(3)) {
bytes_read = static_cast<size_t>(
rnd.Uniform(static_cast<int>(bytes_read)));
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::unique_ptr<RandomAccessFile> file;
std::vector<ReadRequest> reqs(3);
std::vector<std::unique_ptr<char, Deleter>> data;
Expand All @@ -1163,6 +1180,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
ASSERT_OK(reqs[i].status);
ASSERT_EQ(memcmp(reqs[i].scratch, buf.get(), kSectorSize), 0);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}

Expand Down Expand Up @@ -1193,7 +1211,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
ASSERT_OK(wfile->Append(slice));
ASSERT_OK(wfile->InvalidateCache(0, 0));
ASSERT_OK(wfile->Close());
}
}

// Random Read
{
Expand Down Expand Up @@ -1437,7 +1455,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
Slice buf(buf_ptr.get(), data.size());
file->Append(buf);
data.append(std::string(4096, 'T'));
}
}

std::vector<Env::FileAttributes> file_attrs;
ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
Expand Down
80 changes: 57 additions & 23 deletions env/io_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,6 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
const IOOptions& options,
IODebugContext* dbg) {
#if defined(ROCKSDB_IOURING_PRESENT)
size_t reqs_off;
ssize_t ret __attribute__((__unused__));

struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
Expand All @@ -505,35 +502,49 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
struct WrappedReadRequest {
FSReadRequest* req;
struct iovec iov;
explicit WrappedReadRequest(FSReadRequest* r) : req(r) {}
size_t finished_len;
explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
};

autovector<WrappedReadRequest, 32> req_wraps;
autovector<WrappedReadRequest*, 4> incomplete_rq_list;

for (size_t i = 0; i < num_reqs; i++) {
req_wraps.emplace_back(&reqs[i]);
}

reqs_off = 0;
while (num_reqs) {
size_t this_reqs = num_reqs;
size_t reqs_off = 0;
while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();

// If requests exceed depth, split it into batches
if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;

assert(incomplete_rq_list.size() <= this_reqs);
for (size_t i = 0; i < this_reqs; i++) {
size_t index = i + reqs_off;
struct io_uring_sqe* sqe;
WrappedReadRequest* rep_to_submit;
if (i < incomplete_rq_list.size()) {
rep_to_submit = incomplete_rq_list[i];
} else {
rep_to_submit = &req_wraps[reqs_off++];
}
assert(rep_to_submit->req->len > rep_to_submit->finished_len);
rep_to_submit->iov.iov_base =
rep_to_submit->req->scratch + rep_to_submit->finished_len;
rep_to_submit->iov.iov_len =
rep_to_submit->req->len - rep_to_submit->finished_len;

struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(iu);
req_wraps[index].iov.iov_base = reqs[index].scratch;
req_wraps[index].iov.iov_len = reqs[index].len;
io_uring_prep_readv(sqe, fd_, &req_wraps[index].iov, 1,
reqs[index].offset);
io_uring_sqe_set_data(sqe, &req_wraps[index]);
io_uring_prep_readv(
sqe, fd_, &rep_to_submit->iov, 1,
rep_to_submit->req->offset + rep_to_submit->finished_len);
io_uring_sqe_set_data(sqe, rep_to_submit);
}
incomplete_rq_list.clear();

ret = io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
ssize_t ret =
io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
if (static_cast<size_t>(ret) != this_reqs) {
fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
}
Expand All @@ -547,21 +558,44 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
// of our initial wait not reaping all completions
ret = io_uring_wait_cqe(iu, &cqe);
assert(!ret);

req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
FSReadRequest* req = req_wrap->req;
if (static_cast<size_t>(cqe->res) == req_wrap->iov.iov_len) {
req->result = Slice(req->scratch, cqe->res);
req->status = IOStatus::OK();
} else if (cqe->res >= 0) {
req->result = Slice(req->scratch, req_wrap->iov.iov_len - cqe->res);
} else {
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", filename_, cqe->res);
} else {
size_t bytes_read = static_cast<size_t>(cqe->res);
TEST_SYNC_POINT_CALLBACK(
"PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read);
if (bytes_read == req_wrap->iov.iov_len) {
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
} else if (bytes_read < req_wrap->iov.iov_len) {
assert(bytes_read > 0);
assert(bytes_read + req_wrap->finished_len < req->len);
req_wrap->finished_len += bytes_read;
incomplete_rq_list.push_back(req_wrap);
} else {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req returned more bytes than requested",
filename_, cqe->res);
}
}
io_uring_cqe_seen(iu, cqe);
}
num_reqs -= this_reqs;
reqs_off += this_reqs;
}
return IOStatus::OK();
#else
Expand Down

0 comments on commit 942eaba

Please sign in to comment.