diff --git a/env/env_test.cc b/env/env_test.cc index 96f47e83e27..a0aa08f3279 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1137,8 +1137,25 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { ASSERT_OK(wfile->Close()); } - // Random Read - { + // More attempts to simulate more partial result sequences. + for (uint32_t attempt = 0; attempt < 20; attempt++) { + // Random Read + Random rnd(301 + attempt); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { + if (attempt > 0) { + // No failure in the first attempt. + size_t& bytes_read = *static_cast(arg); + if (rnd.OneIn(4)) { + bytes_read = 0; + } else if (rnd.OneIn(3)) { + bytes_read = static_cast( + rnd.Uniform(static_cast(bytes_read))); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + std::unique_ptr file; std::vector reqs(3); std::vector> data; @@ -1163,6 +1180,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { ASSERT_OK(reqs[i].status); ASSERT_EQ(memcmp(reqs[i].scratch, buf.get(), kSectorSize), 0); } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } } @@ -1193,7 +1211,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { ASSERT_OK(wfile->Append(slice)); ASSERT_OK(wfile->InvalidateCache(0, 0)); ASSERT_OK(wfile->Close()); - } + } // Random Read { @@ -1437,7 +1455,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { Slice buf(buf_ptr.get(), data.size()); file->Append(buf); data.append(std::string(4096, 'T')); - } + } std::vector file_attrs; ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs)); diff --git a/env/io_posix.cc b/env/io_posix.cc index 0e2d1d766ff..a2bbab38de1 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -482,9 +482,6 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, const IOOptions& options, IODebugContext* dbg) { #if defined(ROCKSDB_IOURING_PRESENT) - size_t reqs_off; - ssize_t ret __attribute__((__unused__)); - struct io_uring* iu = nullptr; if (thread_local_io_urings_) { iu = static_cast(thread_local_io_urings_->Get()); @@ -505,35 +502,49 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, struct WrappedReadRequest { FSReadRequest* req; struct iovec iov; - explicit WrappedReadRequest(FSReadRequest* r) : req(r) {} + size_t finished_len; + explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {} }; autovector req_wraps; + autovector incomplete_rq_list; for (size_t i = 0; i < num_reqs; i++) { req_wraps.emplace_back(&reqs[i]); } - reqs_off = 0; - while (num_reqs) { - size_t this_reqs = num_reqs; + size_t reqs_off = 0; + while (num_reqs > reqs_off || !incomplete_rq_list.empty()) { + size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size(); // If requests exceed depth, split it into batches if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth; + assert(incomplete_rq_list.size() <= this_reqs); for (size_t i = 0; i < this_reqs; i++) { - size_t index = i + reqs_off; - struct io_uring_sqe* sqe; + WrappedReadRequest* rep_to_submit; + if (i < incomplete_rq_list.size()) { + rep_to_submit = incomplete_rq_list[i]; + } else { + rep_to_submit = &req_wraps[reqs_off++]; + } + assert(rep_to_submit->req->len > rep_to_submit->finished_len); + rep_to_submit->iov.iov_base = + rep_to_submit->req->scratch + rep_to_submit->finished_len; + rep_to_submit->iov.iov_len = + rep_to_submit->req->len - rep_to_submit->finished_len; + struct io_uring_sqe* sqe; sqe = io_uring_get_sqe(iu); - req_wraps[index].iov.iov_base = reqs[index].scratch; - req_wraps[index].iov.iov_len = reqs[index].len; - io_uring_prep_readv(sqe, fd_, &req_wraps[index].iov, 1, - reqs[index].offset); - io_uring_sqe_set_data(sqe, &req_wraps[index]); + io_uring_prep_readv( + sqe, fd_, &rep_to_submit->iov, 1, + rep_to_submit->req->offset + rep_to_submit->finished_len); + io_uring_sqe_set_data(sqe, rep_to_submit); } + incomplete_rq_list.clear(); - ret = io_uring_submit_and_wait(iu, static_cast(this_reqs)); + ssize_t ret = + io_uring_submit_and_wait(iu, static_cast(this_reqs)); if (static_cast(ret) != this_reqs) { fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); } @@ -547,21 +558,44 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, // of our initial wait not reaping all completions ret = io_uring_wait_cqe(iu, &cqe); assert(!ret); + req_wrap = static_cast(io_uring_cqe_get_data(cqe)); FSReadRequest* req = req_wrap->req; - if (static_cast(cqe->res) == req_wrap->iov.iov_len) { - req->result = Slice(req->scratch, cqe->res); - req->status = IOStatus::OK(); - } else if (cqe->res >= 0) { - req->result = Slice(req->scratch, req_wrap->iov.iov_len - cqe->res); - } else { + if (cqe->res < 0) { req->result = Slice(req->scratch, 0); req->status = IOError("Req failed", filename_, cqe->res); + } else { + size_t bytes_read = static_cast(cqe->res); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read); + if (bytes_read == req_wrap->iov.iov_len) { + req->result = Slice(req->scratch, req->len); + req->status = IOStatus::OK(); + } else if (bytes_read == 0) { + // cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + } else if (bytes_read < req_wrap->iov.iov_len) { + assert(bytes_read > 0); + assert(bytes_read + req_wrap->finished_len < req->len); + req_wrap->finished_len += bytes_read; + incomplete_rq_list.push_back(req_wrap); + } else { + req->result = Slice(req->scratch, 0); + req->status = IOError("Req returned more bytes than requested", + filename_, cqe->res); + } } io_uring_cqe_seen(iu, cqe); } - num_reqs -= this_reqs; - reqs_off += this_reqs; } return IOStatus::OK(); #else