Skip to content

Commit

Permalink
Stall writes in WriteBufferManager when memory_usage exceeds buffer_s…
Browse files Browse the repository at this point in the history
…ize (facebook#7898)

Summary:
When WriteBufferManager is shared across DBs and column families
to maintain memory usage under a limit, OOMs have been observed when flush cannot
finish but writes continuously insert to memtables.
In order to avoid OOMs, when memory usage goes beyond buffer_limit_ and DBs tries to write,
this change will stall incoming writers until flush is completed and memory_usage
drops.

Design: Stall condition: When total memory usage exceeds WriteBufferManager::buffer_size_
(memory_usage() >= buffer_size_) WriterBufferManager::ShouldStall() returns true.

DBImpl first block incoming/future writers by calling write_thread_.BeginWriteStall()
(which adds dummy stall object to the writer's queue).
Then DB is blocked on a state State::Blocked (current write doesn't go
through). WBStallInterface object maintained by every DB instance is added to the queue of
WriteBufferManager.

If multiple DBs tries to write during this stall, they will also be
blocked when check WriteBufferManager::ShouldStall() returns true.

End Stall condition: When flush is finished and memory usage goes down, stall will end only if memory
waiting to be flushed is less than buffer_size/2. This lower limit will give time for flush
to complete and avoid continous stalling if memory usage remains close to buffer_size.

WriterBufferManager::EndWriteStall() is called,
which removes all instances from its queue and signal them to continue.
Their state is changed to State::Running and they are unblocked. DBImpl
then signal all incoming writers of that DB to continue by calling
write_thread_.EndWriteStall() (which removes dummy stall object from the
queue).

DB instance creates WBMStallInterface which is an interface to block and
signal DBs during stall.
When DB needs to be blocked or signalled by WriteBufferManager,
state_for_wbm_ state is changed accordingly (RUNNING or BLOCKED).

Pull Request resolved: facebook#7898

Test Plan: Added a new test db/db_write_buffer_manager_test.cc

Reviewed By: anand1976

Differential Revision: D26093227

Pulled By: akankshamahajan15

fbshipit-source-id: 2bbd982a3fb7033f6de6153aa92a221249861aae
  • Loading branch information
akankshamahajan15 authored and facebook-github-bot committed Apr 21, 2021
1 parent 95f6add commit 596e900
Show file tree
Hide file tree
Showing 11 changed files with 1,091 additions and 31 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results.
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.

### New Features
* Add new option allow_stall passed during instance creation of WriteBufferManager. When allow_stall is set, WriteBufferManager will stall all writers shared across multiple DBs and columns if memory usage goes beyond specified WriteBufferManager::buffer_size (soft limit). Stall will be cleared when memory is freed after flush and memory usage goes down below buffer_size.

## 6.20.0 (04/16/2021)
### Behavior Changes
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,9 @@ io_tracer_parser: $(OBJ_DIR)/tools/io_tracer_parser.o $(TOOLS_LIBRARY) $(LIBRARY

db_blob_corruption_test: $(OBJ_DIR)/db/blob/db_blob_corruption_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_write_buffer_manager_test: $(OBJ_DIR)/db/db_write_buffer_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
PREFIX ?= /usr/local
Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_write_buffer_manager_test",
"db/db_write_buffer_manager_test.cc",
"parallel",
[],
[],
],
[
"db_write_test",
"db/db_write_test.cc",
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
// is called by client and this seqnum is advanced.
preserve_deletes_seqnum_.store(0);

if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}
}

Status DBImpl::Resume() {
Expand Down Expand Up @@ -660,6 +664,10 @@ Status DBImpl::CloseHelper() {
}
}

if (write_buffer_manager_ && wbm_stall_) {
write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
}

if (ret.IsAborted()) {
// Reserve IsAborted() error for those where users didn't release
// certain resource and they can release them and come back and
Expand Down
57 changes: 57 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,56 @@ class DBImpl : public DB {
// flush LOG out of application buffer
void FlushInfoLog();

// Interface to block and signal the DB in case of stalling writes by
// WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface.
// When DB needs to be blocked or signalled by WriteBufferManager,
// state_ is changed accordingly.
class WBMStallInterface : public StallInterface {
public:
enum State {
BLOCKED = 0,
RUNNING,
};

WBMStallInterface() : state_cv_(&state_mutex_) {
MutexLock lock(&state_mutex_);
state_ = State::RUNNING;
}

void SetState(State state) {
MutexLock lock(&state_mutex_);
state_ = state;
}

// Change the state_ to State::BLOCKED and wait until its state is
// changed by WriteBufferManager. When stall is cleared, Signal() is
// called to change the state and unblock the DB.
void Block() override {
MutexLock lock(&state_mutex_);
while (state_ == State::BLOCKED) {
TEST_SYNC_POINT("WBMStallInterface::BlockDB");
state_cv_.Wait();
}
}

// Called from WriteBufferManager. This function changes the state_
// to State::RUNNING indicating the stall is cleared and DB can proceed.
void Signal() override {
MutexLock lock(&state_mutex_);
state_ = State::RUNNING;
state_cv_.Signal();
}

private:
// Conditional variable and mutex to block and
// signal the DB during stalling process.
port::Mutex state_mutex_;
port::CondVar state_cv_;
// state represting whether DB is running or blocked because of stall by
// WriteBufferManager.
State state_;
};

protected:
const std::string dbname_;
std::string db_id_;
Expand Down Expand Up @@ -1526,6 +1576,10 @@ class DBImpl : public DB {
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);

// Begin stalling of writes when memory usage increases beyond a certain
// threshold.
void WriteBufferManagerStallWrites();

Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch);

Expand Down Expand Up @@ -2230,6 +2284,9 @@ class DBImpl : public DB {
bool wal_in_db_path_;

BlobFileCompletionCallback blob_callback_;

// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;
};

extern Options SanitizeOptions(const std::string& db, const Options& src,
Expand Down
37 changes: 37 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,20 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time);
}

// If memory usage exceeded beyond a certain threshold,
// write_buffer_manager_->ShouldStall() returns true to all threads writing to
// all DBs and writers will be stalled.
// It does soft checking because WriteBufferManager::buffer_limit_ has already
// exceeded at this point so no new write (including current one) will go
// through until memory usage is decreased.
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldStall())) {
if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall");
} else {
WriteBufferManagerStallWrites();
}
}

if (status.ok() && *need_log_sync) {
// Wait until the parallel syncs are finished. Any sync process has to sync
// the front log too so it is enough to check the status of front()
Expand Down Expand Up @@ -1536,6 +1550,29 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
return s;
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::WriteBufferManagerStallWrites() {
mutex_.AssertHeld();
// First block future writer threads who want to add themselves to the queue
// of WriteThread.
write_thread_.BeginWriteStall();
mutex_.Unlock();

// Change the state to State::Blocked.
static_cast<WBMStallInterface*>(wbm_stall_.get())
->SetState(WBMStallInterface::State::BLOCKED);
// Then WriteBufferManager will add DB instance to its queue
// and block this thread by calling WBMStallInterface::Block().
write_buffer_manager_->BeginWriteStall(wbm_stall_.get());
wbm_stall_->Block();

mutex_.Lock();
// Stall has ended. Signal writer threads so that they can add
// themselves to the WriteThread queue for writes.
write_thread_.EndWriteStall();
}

Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch) {
assert(write_options.low_pri);
Expand Down
Loading

0 comments on commit 596e900

Please sign in to comment.