Skip to content

Commit

Permalink
Implement ReopenWritibaleFile on Windows and other fixes
Browse files Browse the repository at this point in the history
Summary:
Make default impl return NoSupported so the db_blob
  tests exist in a meaningful manner.
  Replace std::thread to port::Thread
Closes facebook#2465

Differential Revision: D5275563

Pulled By: yiwu-arbug

fbshipit-source-id: cedf1a18a2c05e20d768c1308b3f3224dbd70ab6
  • Loading branch information
yuslepukhin authored and facebook-github-bot committed Jun 20, 2017
1 parent c430d69 commit a21db16
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 29 deletions.
2 changes: 1 addition & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
ASSERT_FALSE(flags[sequence].test_and_set());
}
};
std::vector<std::thread> threads;
std::vector<port::Thread> threads;
for (size_t i = 0; i < kThreads; i++) {
threads.emplace_back(writer, i);
}
Expand Down
3 changes: 1 addition & 2 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ class Env {
virtual Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s;
return s;
return Status::NotSupported();
}

// Reuse an existing file by renaming it and opening it as writable.
Expand Down
40 changes: 33 additions & 7 deletions port/win/env_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ Status WinEnvIO::NewRandomAccessFile(const std::string& fname,
return s;
}

Status WinEnvIO::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status WinEnvIO::OpenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options,
bool reopen) {

const size_t c_BufferCapacity = 64 * 1024;

EnvOptions local_options(options);
Expand All @@ -264,12 +266,19 @@ Status WinEnvIO::NewWritableFile(const std::string& fname,

if (local_options.use_mmap_writes) {
desired_access |= GENERIC_READ;
} else {
}
else {
// Adding this solely for tests to pass (fault_injection_test,
// wal_manager_test).
shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE);
}

// This will always truncate the file
DWORD creation_disposition = CREATE_ALWAYS;
if (reopen) {
creation_disposition = OPEN_ALWAYS;
}

HANDLE hFile = 0;
{
IOSTATS_TIMER_GUARD(open_nanos);
Expand All @@ -278,7 +287,7 @@ Status WinEnvIO::NewWritableFile(const std::string& fname,
desired_access, // Access desired
shared_mode,
NULL, // Security attributes
CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC
creation_disposition, // Posix env says (reopen) ? (O_CREATE | O_APPEND) : O_CREAT | O_TRUNC
fileFlags, // Flags
NULL); // Template File
}
Expand All @@ -289,6 +298,18 @@ Status WinEnvIO::NewWritableFile(const std::string& fname,
"Failed to create a NewWriteableFile: " + fname, lastError);
}

// We will start writing at the end, appending
if (reopen) {
LARGE_INTEGER zero_move;
zero_move.QuadPart = 0;
BOOL ret = SetFilePointerEx(hFile, zero_move, NULL, FILE_END);
if (!ret) {
auto lastError = GetLastError();
return IOErrorFromWindowsError(
"Failed to create a ReopenWritableFile move to the end: " + fname, lastError);
}
}

if (options.use_mmap_writes) {
// We usually do not use mmmapping on SSD and thus we pass memory
// page_size
Expand All @@ -304,7 +325,7 @@ Status WinEnvIO::NewWritableFile(const std::string& fname,
}

Status WinEnvIO::NewRandomRWFile(const std::string & fname,
unique_ptr<RandomRWFile>* result, const EnvOptions & options) {
std::unique_ptr<RandomRWFile>* result, const EnvOptions & options) {

Status s;

Expand Down Expand Up @@ -933,7 +954,12 @@ Status WinEnv::NewRandomAccessFile(const std::string& fname,
Status WinEnv::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
return winenv_io_.NewWritableFile(fname, result, options);
return winenv_io_.OpenWritableFile(fname, result, options, false);
}

Status WinEnv::ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result, const EnvOptions& options) {
return winenv_io_.OpenWritableFile(fname, result, options, true);
}

Status WinEnv::NewRandomRWFile(const std::string & fname,
Expand Down
21 changes: 17 additions & 4 deletions port/win/env_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,16 @@ class WinEnvIO {
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);

// Helper for NewWritable and ReopenWritableFile
virtual Status OpenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options,
bool reopen);

virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);

virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);

// The returned file will only be accessed by one thread at a time.
virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
Expand Down Expand Up @@ -204,6 +206,17 @@ class WinEnv : public Env {
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;

// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
Status ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;

// The returned file will only be accessed by one thread at a time.
Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
Expand Down
39 changes: 31 additions & 8 deletions port/win/io_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) {
//////////////////////////////////////////////////////////////////////////////////////////////////
/// WinRandomAccessBase

inline
SSIZE_T WinRandomAccessImpl::PositionedReadInternal(char* src,
size_t numBytes,
uint64_t offset) const {
Expand Down Expand Up @@ -733,13 +734,31 @@ Status WinWritableImpl::PreallocateInternal(uint64_t spaceToReserve) {
return fallocate(file_data_->GetName(), file_data_->GetFileHandle(), spaceToReserve);
}

inline
WinWritableImpl::WinWritableImpl(WinFileData* file_data, size_t alignment)
: file_data_(file_data),
alignment_(alignment),
filesize_(0),
next_write_offset_(0),
reservedsize_(0) {

// Query current position in case ReopenWritableFile is called
// This position is only important for buffered writes
// for unbuffered writes we explicitely specify the position.
LARGE_INTEGER zero_move;
zero_move.QuadPart = 0; // Do not move
LARGE_INTEGER pos;
pos.QuadPart = 0;
BOOL ret = SetFilePointerEx(file_data_->GetFileHandle(), zero_move, &pos,
FILE_CURRENT);
// Querying no supped to fail
if (ret) {
next_write_offset_ = pos.QuadPart;
} else {
assert(false);
}
}

inline
Status WinWritableImpl::AppendImpl(const Slice& data) {

Status s;
Expand All @@ -754,12 +773,12 @@ Status WinWritableImpl::AppendImpl(const Slice& data) {
// With no offset specified we are appending
// to the end of the file

assert(IsSectorAligned(filesize_));
assert(IsSectorAligned(next_write_offset_));
assert(IsSectorAligned(data.size()));
assert(IsAligned(GetAlignement(), data.data()));

SSIZE_T ret = pwrite(file_data_->GetFileHandle(), data.data(),
data.size(), filesize_);
data.size(), next_write_offset_);

if (ret < 0) {
auto lastError = GetLastError();
Expand Down Expand Up @@ -787,12 +806,13 @@ Status WinWritableImpl::AppendImpl(const Slice& data) {

if(s.ok()) {
assert(written == data.size());
filesize_ += data.size();
next_write_offset_ += data.size();
}

return s;
}

inline
Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) {

if(file_data_->use_direct_io()) {
Expand All @@ -816,8 +836,8 @@ Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset)
// For sequential write this would be simple
// size extension by data.size()
uint64_t write_end = offset + data.size();
if (write_end >= filesize_) {
filesize_ = write_end;
if (write_end >= next_write_offset_) {
next_write_offset_ = write_end;
}
}
return s;
Expand All @@ -830,11 +850,12 @@ Status WinWritableImpl::TruncateImpl(uint64_t size) {
Status s = ftruncate(file_data_->GetName(), file_data_->GetFileHandle(),
size);
if (s.ok()) {
filesize_ = size;
next_write_offset_ = size;
}
return s;
}

inline
Status WinWritableImpl::CloseImpl() {

Status s;
Expand All @@ -857,6 +878,7 @@ Status WinWritableImpl::CloseImpl() {
return s;
}

inline
Status WinWritableImpl::SyncImpl() {
Status s;
// Calls flush buffers
Expand All @@ -869,6 +891,7 @@ Status WinWritableImpl::SyncImpl() {
}


inline
Status WinWritableImpl::AllocateImpl(uint64_t offset, uint64_t len) {
Status status;
TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds);
Expand Down Expand Up @@ -943,7 +966,7 @@ Status WinWritableFile::Sync() {
Status WinWritableFile::Fsync() { return SyncImpl(); }

uint64_t WinWritableFile::GetFileSize() {
return GetFileSizeImpl();
return GetFileNextWriteOffset();
}

Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) {
Expand Down
6 changes: 3 additions & 3 deletions port/win/io_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class WinWritableImpl {
protected:
WinFileData* file_data_;
const uint64_t alignment_;
uint64_t filesize_; // How much data is actually written disk
uint64_t next_write_offset_; // Needed because Windows does not support O_APPEND
uint64_t reservedsize_; // how far we have reserved space

virtual Status PreallocateInternal(uint64_t spaceToReserve);
Expand All @@ -324,14 +324,14 @@ class WinWritableImpl {

Status SyncImpl();

uint64_t GetFileSizeImpl() {
uint64_t GetFileNextWriteOffset() {
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
// but tests implement their own writable files and do not use
// WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
return next_write_offset_;
}

Status AllocateImpl(uint64_t offset, uint64_t len);
Expand Down
5 changes: 4 additions & 1 deletion util/timer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
// work, even for commercial purposes, all without asking permission.

#pragma once

#include "port/port.h"

#include <assert.h>
#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -213,5 +216,5 @@ class TimerQueue {
public:
std::vector<WorkItem>& getContainer() { return this->c; }
} m_items;
std::thread m_th;
rocksdb::port::Thread m_th;
};
6 changes: 3 additions & 3 deletions utilities/blob_db/blob_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,11 @@ TEST_F(BlobDBTest, Compression) {
TEST_F(BlobDBTest, DISABLED_MultipleWriters) {
Open();

std::vector<std::thread> workers;
std::vector<port::Thread> workers;
for (size_t ii = 0; ii < 10; ii++)
workers.push_back(std::thread(&BlobDBTest::InsertBlobs, this));
workers.push_back(port::Thread(&BlobDBTest::InsertBlobs, this));

for (std::thread &t : workers) {
for (auto& t : workers) {
if (t.joinable()) {
t.join();
}
Expand Down

0 comments on commit a21db16

Please sign in to comment.