Skip to content

Commit

Permalink
Remove double buffering on RandomRead on Windows.
Browse files Browse the repository at this point in the history
Summary:
Remove double buffering on RandomRead on Windows.
  With more logic appear in file reader/write Read no longer
  obeys forwarding calls to Windows implementation.
  Previously direct_io (unbuffered) was only available on Windows
  but now is supported as generic.
  We remove intermediate buffering on Windows.
  Remove random_access_max_buffer_size option which was windows specific.
  Non-zero values for that opton introduced unnecessary lock contention.
  Remove Env::EnableReadAhead(), Env::ShouldForwardRawRequest() that are
  no longer necessary.
  Add aligned buffer reads for cases when requested reads exceed read ahead size.
Closes facebook#2105

Differential Revision: D4847770

Pulled By: siying

fbshipit-source-id: 8ab48f8e854ab498a4fd398a6934859792a2788f
  • Loading branch information
yuslepukhin authored and facebook-github-bot committed Apr 27, 2017
1 parent e15382c commit cdad04b
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 311 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

## 5.4.0 (04/11/2017)
### Public API Change
* random_access_max_buffer_size no longer has any effect
* Removed Env::EnableReadAhead(), Env::ShouldForwardRawRequest()
* Support dynamically change `stats_dump_period_sec` option via SetDBOptions().
* Added ReadOptions::max_skippable_internal_keys to set a threshold to fail a request as incomplete when too many keys are being skipped when using iterators.
* DB::Get in place of std::string accepts PinnableSlice, which avoids the extra memcpy of value to std::string in most of cases.
Expand Down
1 change: 1 addition & 0 deletions db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "db/compaction_iterator.h"
#include "rocksdb/listener.h"
#include "table/internal_iterator.h"

namespace rocksdb {
Expand Down
21 changes: 0 additions & 21 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,16 +510,6 @@ class RandomAccessFile {
return Status::OK();
}

// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact buffering or locking.
virtual bool ShouldForwardRawRequest() const {
return false;
}

// For cases when read-ahead is implemented in the platform dependent
// layer
virtual void EnableReadAhead() {}

// Tries to get an unique ID for this file that will be the same each time
// the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
Expand Down Expand Up @@ -751,17 +741,6 @@ class RandomRWFile {
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }

// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact read_ahead buffering or locking.
// The implementation below takes care of reading ahead
virtual bool ShouldForwardRawRequest() const {
return false;
}

// For cases when read-ahead is implemented in the platform dependent
// layer. This is when ShouldForwardRawRequest() returns true.
virtual void EnableReadAhead() {}

// Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
// Pass aligned buffer when use_direct_io() returns true.
virtual Status Write(uint64_t offset, const Slice& data) = 0;
Expand Down
6 changes: 3 additions & 3 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace rocksdb {
// settable through string on limited platforms as it depends on behavior of
// compilers.
#ifndef ROCKSDB_LITE
#ifdef OS_LINUX
#if defined OS_LINUX || defined OS_WIN
#ifndef __clang__

class OptionsSettableTest : public testing::Test {
Expand All @@ -54,7 +54,7 @@ class OptionsSettableTest : public testing::Test {
};

const char kSpecialChar = 'z';
typedef std::vector<std::pair<int, size_t>> OffsetGap;
typedef std::vector<std::pair<size_t, size_t>> OffsetGap;

void FillWithSpecialChar(char* start_ptr, size_t total_size,
const OffsetGap& blacklist) {
Expand Down Expand Up @@ -446,7 +446,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
delete[] new_options_ptr;
}
#endif // !__clang__
#endif // OS_LINUX
#endif // OS_LINUX || OS_WIN
#endif // !ROCKSDB_LITE

} // namespace rocksdb
Expand Down
206 changes: 14 additions & 192 deletions port/win/io_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -638,71 +638,6 @@ Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) {
//////////////////////////////////////////////////////////////////////////////////////////////////
/// WinRandomAccessBase

// Helper
void CalculateReadParameters(size_t alignment, uint64_t offset,
size_t bytes_requested,
size_t& actual_bytes_toread,
uint64_t& first_page_start) {

first_page_start = TruncateToPageBoundary(alignment, offset);
const uint64_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
}

SSIZE_T WinRandomAccessImpl::ReadIntoBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
AlignedBuffer& buffer, char* dest) const {
assert(buffer.CurrentSize() == 0);
assert(buffer.Capacity() >= bytes_to_read);

SSIZE_T read =
PositionedReadInternal(buffer.Destination(), bytes_to_read,
first_page_start);

if (read > 0) {
buffer.Size(read);

// Let's figure out how much we read from the users standpoint
if ((first_page_start + buffer.CurrentSize()) > user_offset) {
assert(first_page_start <= user_offset);
size_t buffer_offset = user_offset - first_page_start;
read = buffer.Read(dest, buffer_offset, left);
} else {
read = 0;
}
left -= read;
}
return read;
}

SSIZE_T WinRandomAccessImpl::ReadIntoOneShotBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const {
AlignedBuffer bigBuffer;
bigBuffer.Alignment(buffer_.Alignment());
bigBuffer.AllocateNewBuffer(bytes_to_read);

return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
bigBuffer, dest);
}

SSIZE_T WinRandomAccessImpl::ReadIntoInstanceBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const {
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
left, buffer_, dest);

if (read > 0) {
buffered_start_ = first_page_start;
}

return read;
}

SSIZE_T WinRandomAccessImpl::PositionedReadInternal(char* src,
size_t numBytes,
uint64_t offset) const {
Expand All @@ -714,112 +649,36 @@ WinRandomAccessImpl::WinRandomAccessImpl(WinFileData* file_base,
size_t alignment,
const EnvOptions& options) :
file_base_(file_base),
read_ahead_(false),
compaction_readahead_size_(options.compaction_readahead_size),
random_access_max_buffer_size_(options.random_access_max_buffer_size),
buffer_(),
buffered_start_(0) {
alignment_(alignment) {

assert(!options.use_mmap_reads);

// Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment);
}

inline
Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result,
char* scratch) const {

Status s;
SSIZE_T r = -1;
size_t left = n;
char* dest = scratch;

// Check buffer alignment
if (file_base_->use_direct_io()) {
if (!IsAligned(alignment_, scratch)) {
return Status::InvalidArgument("WinRandomAccessImpl::ReadImpl: scratch is not properly aligned");
}
}

if (n == 0) {
*result = Slice(scratch, 0);
return s;
}

// When in direct I/O mode we need to do the following changes:
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
if (file_base_->use_direct_io()) {
uint64_t first_page_start = 0;
size_t actual_bytes_toread = 0;
size_t bytes_requested = left;

if (!read_ahead_ && random_access_max_buffer_size_ == 0) {
CalculateReadParameters(buffer_.Alignment(), offset, bytes_requested,
actual_bytes_toread,
first_page_start);

assert(actual_bytes_toread > 0);

r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {

std::unique_lock<std::mutex> lock(buffer_mut_);

// Let's see if at least some of the requested data is already
// in the buffer
if (offset >= buffered_start_ &&
offset < (buffered_start_ + buffer_.CurrentSize())) {
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
assert(r >= 0);

left -= size_t(r);
offset += r;
dest += r;
}

// Still some left or none was buffered
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
bytes_requested = left;

if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
}

CalculateReadParameters(buffer_.Alignment(), offset, bytes_requested,
actual_bytes_toread,
first_page_start);

assert(actual_bytes_toread > 0);

if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
} else {
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
}
} else {
r = PositionedReadInternal(scratch, left, offset);
if (r > 0) {
left -= r;
}
}
size_t left = n;
char* dest = scratch;

if (r < 0) {
SSIZE_T r = PositionedReadInternal(scratch, left, offset);
if (r > 0) {
left -= r;
} else if (r < 0) {
auto lastError = GetLastError();
// Posix impl wants to treat reads from beyond
// of the file as OK.
Expand All @@ -833,23 +692,6 @@ Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result,
return s;
}

inline
void WinRandomAccessImpl::HintImpl(RandomAccessFile::AccessPattern pattern) {
if (pattern == RandomAccessFile::SEQUENTIAL && file_base_->use_direct_io() &&
compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) {
read_ahead_ = true;
// This would allocate read-ahead size + 2 alignments
// - one for memory alignment which added implicitly by AlignedBuffer
// - We add one more alignment because we will read one alignment more
// from disk
buffer_.AllocateNewBuffer(compaction_readahead_size_ +
buffer_.Alignment());
}
}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
/// WinRandomAccessFile

Expand All @@ -867,18 +709,6 @@ Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
return ReadImpl(offset, n, result, scratch);
}

void WinRandomAccessFile::EnableReadAhead() {
HintImpl(SEQUENTIAL);
}

bool WinRandomAccessFile::ShouldForwardRawRequest() const {
return true;
}

void WinRandomAccessFile::Hint(AccessPattern pattern) {
HintImpl(pattern);
}

Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK();
}
Expand Down Expand Up @@ -1136,14 +966,6 @@ size_t WinRandomRWFile::GetRequiredBufferAlignment() const {
return GetAlignement();
}

bool WinRandomRWFile::ShouldForwardRawRequest() const {
return true;
}

void WinRandomRWFile::EnableReadAhead() {
HintImpl(RandomAccessFile::SEQUENTIAL);
}

Status WinRandomRWFile::Write(uint64_t offset, const Slice & data) {
return PositionedAppendImpl(data, offset);
}
Expand Down
Loading

0 comments on commit cdad04b

Please sign in to comment.