diff --git a/HISTORY.md b/HISTORY.md index be929296cf1..2e48a5a40af 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,9 @@ * Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results. * Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`. +### New Features +* Add new option allow_stall passed during instance creation of WriteBufferManager. When allow_stall is set, WriteBufferManager will stall all writers shared across multiple DBs and columns if memory usage goes beyond specified WriteBufferManager::buffer_size (soft limit). Stall will be cleared when memory is freed after flush and memory usage goes down below buffer_size. + ## 6.20.0 (04/16/2021) ### Behavior Changes * `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush. diff --git a/Makefile b/Makefile index 4e7e0c16bcc..4e5191a4737 100644 --- a/Makefile +++ b/Makefile @@ -1859,6 +1859,9 @@ io_tracer_parser: $(OBJ_DIR)/tools/io_tracer_parser.o $(TOOLS_LIBRARY) $(LIBRARY db_blob_corruption_test: $(OBJ_DIR)/db/blob/db_blob_corruption_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) + +db_write_buffer_manager_test: $(OBJ_DIR)/db/db_write_buffer_manager_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index 4cb069139e1..ea89b657f05 100644 --- a/TARGETS +++ b/TARGETS @@ -1411,6 +1411,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_write_buffer_manager_test", + "db/db_write_buffer_manager_test.cc", + "parallel", + [], + [], + ], [ "db_write_test", "db/db_write_test.cc", diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d9f60998492..dd5a3335dfd 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -270,6 +270,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() // is called by client and this seqnum is advanced. preserve_deletes_seqnum_.store(0); + + if (write_buffer_manager_) { + wbm_stall_.reset(new WBMStallInterface()); + } } Status DBImpl::Resume() { @@ -660,6 +664,10 @@ Status DBImpl::CloseHelper() { } } + if (write_buffer_manager_ && wbm_stall_) { + write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get()); + } + if (ret.IsAborted()) { // Reserve IsAborted() error for those where users didn't release // certain resource and they can release them and come back and diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 57edeb60e61..0841f485b82 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1048,6 +1048,56 @@ class DBImpl : public DB { // flush LOG out of application buffer void FlushInfoLog(); + // Interface to block and signal the DB in case of stalling writes by + // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface. + // When DB needs to be blocked or signalled by WriteBufferManager, + // state_ is changed accordingly. + class WBMStallInterface : public StallInterface { + public: + enum State { + BLOCKED = 0, + RUNNING, + }; + + WBMStallInterface() : state_cv_(&state_mutex_) { + MutexLock lock(&state_mutex_); + state_ = State::RUNNING; + } + + void SetState(State state) { + MutexLock lock(&state_mutex_); + state_ = state; + } + + // Change the state_ to State::BLOCKED and wait until its state is + // changed by WriteBufferManager. When stall is cleared, Signal() is + // called to change the state and unblock the DB. + void Block() override { + MutexLock lock(&state_mutex_); + while (state_ == State::BLOCKED) { + TEST_SYNC_POINT("WBMStallInterface::BlockDB"); + state_cv_.Wait(); + } + } + + // Called from WriteBufferManager. This function changes the state_ + // to State::RUNNING indicating the stall is cleared and DB can proceed. + void Signal() override { + MutexLock lock(&state_mutex_); + state_ = State::RUNNING; + state_cv_.Signal(); + } + + private: + // Conditional variable and mutex to block and + // signal the DB during stalling process. + port::Mutex state_mutex_; + port::CondVar state_cv_; + // state represting whether DB is running or blocked because of stall by + // WriteBufferManager. + State state_; + }; + protected: const std::string dbname_; std::string db_id_; @@ -1526,6 +1576,10 @@ class DBImpl : public DB { // `num_bytes` going through. Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options); + // Begin stalling of writes when memory usage increases beyond a certain + // threshold. + void WriteBufferManagerStallWrites(); + Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, WriteBatch* my_batch); @@ -2230,6 +2284,9 @@ class DBImpl : public DB { bool wal_in_db_path_; BlobFileCompletionCallback blob_callback_; + + // Pointer to WriteBufferManager stalling interface. + std::unique_ptr wbm_stall_; }; extern Options SanitizeOptions(const std::string& db, const Options& src, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index f9a6adc078e..99570c9c52a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -964,6 +964,20 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); } + // If memory usage exceeded beyond a certain threshold, + // write_buffer_manager_->ShouldStall() returns true to all threads writing to + // all DBs and writers will be stalled. + // It does soft checking because WriteBufferManager::buffer_limit_ has already + // exceeded at this point so no new write (including current one) will go + // through until memory usage is decreased. + if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldStall())) { + if (write_options.no_slowdown) { + status = Status::Incomplete("Write stall"); + } else { + WriteBufferManagerStallWrites(); + } + } + if (status.ok() && *need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() @@ -1536,6 +1550,29 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, return s; } +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +void DBImpl::WriteBufferManagerStallWrites() { + mutex_.AssertHeld(); + // First block future writer threads who want to add themselves to the queue + // of WriteThread. + write_thread_.BeginWriteStall(); + mutex_.Unlock(); + + // Change the state to State::Blocked. + static_cast(wbm_stall_.get()) + ->SetState(WBMStallInterface::State::BLOCKED); + // Then WriteBufferManager will add DB instance to its queue + // and block this thread by calling WBMStallInterface::Block(). + write_buffer_manager_->BeginWriteStall(wbm_stall_.get()); + wbm_stall_->Block(); + + mutex_.Lock(); + // Stall has ended. Signal writer threads so that they can add + // themselves to the WriteThread queue for writes. + write_thread_.EndWriteStall(); +} + Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, WriteBatch* my_batch) { assert(write_options.low_pri); diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc new file mode 100644 index 00000000000..0ae74475284 --- /dev/null +++ b/db/db_write_buffer_manager_test.cc @@ -0,0 +1,801 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "db/write_thread.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +class DBWriteBufferManagerTest : public DBTestBase, + public testing::WithParamInterface { + public: + DBWriteBufferManagerTest() + : DBTestBase("/db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + bool cost_cache_; +}; + +TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + // WriteBufferManager::buffer_size_ has exceeded after the previous write is + // completed. + + // This make sures write will go through and if stall was in effect, it will + // end. + ASSERT_OK(Put(0, Key(2), DummyString(1), wo)); +} + +// Test Single DB with multiple writer threads get blocked when +// WriteBufferManager execeeds buffer_size_ and flush is waiting to be +// finished. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". No flush will be triggered. + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + // WriteBufferManager::buffer_size_ has exceeded after the previous write is + // completed. + + std::unordered_set w_set; + std::vector threads; + int wait_count_db = 0; + int num_writers = 4; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.SignalAll(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + InstrumentedMutexLock lock(&mutex); + WriteThread::Writer* w = reinterpret_cast(arg); + w_set.insert(w); + // Allow the flush to continue if all writer threads are blocked. + if (w_set.size() == (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + std::function writer = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + Status tmp = Put(cf, Slice(key), DummyString(1), wo); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + // Flow: + // main_writer thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // and they will be blocked. + // | + // | + // Last writer thread will write and when its blocked it will signal Flush to + // continue to clear the stall. + + threads.emplace_back(writer, 1); + // Wait untill first thread (main_writer) writing to DB is blocked and then + // create the multiple writers which will be blocked from getting added to the + // queue because stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + for (int i = 0; i < num_writers; i++) { + threads.emplace_back(writer, i % 4); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s); + + // Number of DBs blocked. + ASSERT_EQ(wait_count_db, 1); + // Number of Writer threads blocked. + ASSERT_EQ(w_set.size(), num_writers); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple DBs get blocked when WriteBufferManager limit exceeds and flush +// is waiting to be finished but DBs tries to write meanwhile. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) { + std::vector dbnames; + std::vector dbs; + int num_dbs = 3; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager Limit exceeded. + std::vector threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.Signal(); + // Since this is the last DB, signal Flush to continue. + if (wait_count_db == num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + // Write to DB. + std::function write_db = [&](DB* db) { + Status tmp = db->Put(wo, Key(3), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + // Flow: + // db_ will write and will be blocked (as Flush will on hold and will create + // stall in effect). + // | + // multiple dbs writers will be created to write to that db and they will be + // blocked. + // | + // | + // Last writer will write and when its blocked it will signal Flush to + // continue to clear the stall. + + threads.emplace_back(write_db, db_); + // Wait untill first DB is blocked and then create the multiple writers for + // different DBs which will be blocked from getting added to the queue because + // stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + for (int i = 0; i < num_dbs; i++) { + threads.emplace_back(write_db, dbs[i]); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s); + ASSERT_EQ(num_dbs + 1, wait_count_db); + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple DBs and multiple columns get +// blocked when stall by WriteBufferManager is in effect. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) { + std::vector dbnames; + std::vector dbs; + int num_dbs = 3; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // dbs[0] is completed. + std::vector threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::unordered_set w_set; + std::vector writer_threads; + std::atomic thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + thread_num.fetch_add(1); + cv.Signal(); + // Allow the flush to continue if all writer threads are blocked. + if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + WriteThread::Writer* w = reinterpret_cast(arg); + { + InstrumentedMutexLock lock(&mutex); + w_set.insert(w); + thread_num.fetch_add(1); + // Allow the flush continue if all writer threads are blocked. + if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + // Write to multiple columns of db_. + std::function write_cf = [&](int cf) { + Status tmp = Put(cf, Key(3), DummyString(1), wo); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + // Write to multiple DBs. + std::function write_db = [&](DB* db) { + Status tmp = db->Put(wo, Key(3), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s2 = s2 && tmp.ok(); + }; + + // Flow: + // thread will write to db_ will be blocked (as Flush will on hold, + // buffer_size_ has exceeded and will create stall in effect). + // | + // | + // multiple writers threads writing to different DBs and to db_ across + // multiple columns will be created and they will be blocked due to stall. + // | + // | + // Last writer thread will write and when its blocked it will signal Flush to + // continue to clear the stall. + threads.emplace_back(write_db, db_); + // Wait untill first thread is blocked and then create the multiple writer + // threads. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_dbs; i++) { + // Write to multiple columns of db_. + writer_threads.emplace_back(write_cf, i % 3); + // Write to different dbs. + threads.emplace_back(write_db, dbs[i]); + } + for (auto& t : threads) { + t.join(); + } + for (auto& t : writer_threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + + // Number of DBs blocked. + ASSERT_EQ(num_dbs + 1, wait_count_db); + // Number of Writer threads blocked. + ASSERT_EQ(w_set.size(), num_dbs); + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple columns of db_ by passing +// different values to WriteOption.no_slown_down. +TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". No flush will be triggered. + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // db_ is completed. + + std::unordered_set w_slowdown_set; + std::vector threads; + int wait_count_db = 0; + int num_writers = 4; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic thread_num(0); + std::atomic w_no_slowdown(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.SignalAll(); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + { + InstrumentedMutexLock lock(&mutex); + WriteThread::Writer* w = reinterpret_cast(arg); + w_slowdown_set.insert(w); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + (unsigned long)w_no_slowdown.load( + std::memory_order_relaxed) == + (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + + std::function write_slow_down = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = false; + Status tmp = Put(cf, Slice(key), DummyString(1), write_op); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + + std::function write_no_slow_down = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = true; + Status tmp = Put(cf, Slice(key), DummyString(1), write_op); + { + InstrumentedMutexLock lock(&mutex); + s2 = s2 && !tmp.ok(); + w_no_slowdown.fetch_add(1); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)w_no_slowdown.load(std::memory_order_relaxed) == + (unsigned long)num_writers) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }; + + // Flow: + // main_writer thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // with different values of WriteOptions.no_slowdown. Some of them will + // be blocked and some of them will return with Incomplete status. + // | + // | + // Last writer thread will write and when its blocked/return it will signal + // Flush to continue to clear the stall. + threads.emplace_back(write_slow_down, 1); + // Wait untill first thread (main_writer) writing to DB is blocked and then + // create the multiple writers which will be blocked from getting added to the + // queue because stall is in effect. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_writers; i += 2) { + threads.emplace_back(write_no_slow_down, (i) % 4); + threads.emplace_back(write_slow_down, (i + 1) % 4); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + // Number of DBs blocked. + ASSERT_EQ(wait_count_db, 1); + // Number of Writer threads blocked. + ASSERT_EQ(w_slowdown_set.size(), num_writers / 2); + // Number of Writer threads with WriteOptions.no_slowdown = true. + ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_writers / 2); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple threads writing across multiple columns of db_ and different +// dbs by passing different values to WriteOption.no_slown_down. +TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { + std::vector dbnames; + std::vector dbs; + int num_dbs = 4; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, true)); + } + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000))); + } + // Insert to db_. + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + // WriteBufferManager::buffer_size_ has exceeded after the previous write to + // dbs[0] is completed. + std::vector threads; + int wait_count_db = 0; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::unordered_set w_slowdown_set; + std::vector writer_threads; + std::atomic thread_num(0); + std::atomic w_no_slowdown(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.Signal(); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + WriteThread::Writer* w = reinterpret_cast(arg); + InstrumentedMutexLock lock(&mutex); + w_slowdown_set.insert(w); + // Allow the flush continue if all writer threads are blocked. + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s1 = true, s2 = true; + std::function write_slow_down = [&](DB* db) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = false; + Status tmp = db->Put(write_op, Slice(key), DummyString(1)); + InstrumentedMutexLock lock(&mutex); + s1 = s1 && tmp.ok(); + }; + + std::function write_no_slow_down = [&](DB* db) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions write_op; + write_op.no_slowdown = true; + Status tmp = db->Put(write_op, Slice(key), DummyString(1)); + { + InstrumentedMutexLock lock(&mutex); + s2 = s2 && !tmp.ok(); + w_no_slowdown.fetch_add(1); + if (w_slowdown_set.size() + + (unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) + + wait_count_db) == + (unsigned long)(2 * num_dbs + 1)) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + } + }; + + // Flow: + // first thread will write but will be blocked (as Flush will on hold, + // buffer_size_ has exceeded, thus will create stall in effect). + // | + // | + // multiple writer threads will be created to write across multiple columns + // of db_ and different DBs with different values of + // WriteOptions.no_slowdown. Some of them will be blocked and some of them + // will return with Incomplete status. + // | + // | + // Last writer thread will write and when its blocked/return it will signal + // Flush to continue to clear the stall. + threads.emplace_back(write_slow_down, db_); + // Wait untill first thread writing to DB is blocked and then + // create the multiple writers. + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + + for (int i = 0; i < num_dbs; i += 2) { + // Write to multiple columns of db_. + writer_threads.emplace_back(write_slow_down, db_); + writer_threads.emplace_back(write_no_slow_down, db_); + // Write to different DBs. + threads.emplace_back(write_slow_down, dbs[i]); + threads.emplace_back(write_no_slow_down, dbs[i + 1]); + } + + for (auto& t : threads) { + t.join(); + } + + for (auto& t : writer_threads) { + t.join(); + } + + ASSERT_TRUE(s1); + ASSERT_TRUE(s2); + // Number of DBs blocked. + ASSERT_EQ((num_dbs / 2) + 1, wait_count_db); + // Number of writer threads writing to db_ blocked from getting added to the + // queue. + ASSERT_EQ(w_slowdown_set.size(), num_dbs / 2); + // Number of threads with WriteOptions.no_slowdown = true. + ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_dbs); + + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, + testing::Bool()); + +} // namespace ROCKSDB_NAMESPACE + +#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +extern "C" { +void RegisterCustomObjects(int argc, char** argv); +} +#else +void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} +#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/write_thread.cc b/db/write_thread.cc index a3bbce2eebd..ac3a2f86915 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -241,6 +241,7 @@ bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { MutexLock lock(&stall_mu_); writers = newest_writer->load(std::memory_order_relaxed); if (writers == &write_stall_dummy_) { + TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w); stall_cv_.Wait(); // Load newest_writers_ again since it may have changed writers = newest_writer->load(std::memory_order_relaxed); diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index aa44c140675..67aef7f8fe7 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -13,43 +13,86 @@ #pragma once #include +#include #include +#include +#include + #include "rocksdb/cache.h" namespace ROCKSDB_NAMESPACE { +// Interface to block and signal DB instances. +// Each DB instance contains ptr to StallInterface. +class StallInterface { + public: + virtual ~StallInterface() {} + + virtual void Block() = 0; + + virtual void Signal() = 0; +}; + class WriteBufferManager { public: - // _buffer_size = 0 indicates no limit. Memory won't be capped. + // Parameters: + // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped. // memory_usage() won't be valid and ShouldFlush() will always return true. - // if `cache` is provided, we'll put dummy entries in the cache and cost - // the memory allocated to the cache. It can be used even if _buffer_size = 0. + // + // cache_: if `cache` is provided, we'll put dummy entries in the cache and + // cost the memory allocated to the cache. It can be used even if _buffer_size + // = 0. + // + // allow_stall: if set true, it will enable stalling of writes when + // memory_usage() exceeds buffer_size. It will wait for flush to complete and + // memory usage to drop down. explicit WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache = {}); + std::shared_ptr cache = {}, + bool allow_stall = false); // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; ~WriteBufferManager(); + // Returns true if buffer_limit is passed to limit the total memory usage and + // is greater than 0. bool enabled() const { return buffer_size() > 0; } + // Returns true if pointer to cache is passed. bool cost_to_cache() const { return cache_rep_ != nullptr; } + // Returns the total memory used by memtables. // Only valid if enabled() size_t memory_usage() const { return memory_used_.load(std::memory_order_relaxed); } + + // Returns the total memory used by active memtables. size_t mutable_memtable_memory_usage() const { return memory_active_.load(std::memory_order_relaxed); } + size_t dummy_entries_in_cache_usage() const { return dummy_size_.load(std::memory_order_relaxed); } + + // Returns the buffer_size. size_t buffer_size() const { return buffer_size_.load(std::memory_order_relaxed); } + void SetBufferSize(size_t new_size) { + buffer_size_.store(new_size, std::memory_order_relaxed); + mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); + // Check if stall is active and can be ended. + if (allow_stall_) { + EndWriteStall(); + } + } + + // Below functions should be called by RocksDB internally. + // Should only be called from write thread bool ShouldFlush() const { if (enabled()) { @@ -69,35 +112,50 @@ class WriteBufferManager { return false; } - void ReserveMem(size_t mem) { - if (cache_rep_ != nullptr) { - ReserveMemWithCache(mem); - } else if (enabled()) { - memory_used_.fetch_add(mem, std::memory_order_relaxed); - } - if (enabled()) { - memory_active_.fetch_add(mem, std::memory_order_relaxed); + // Returns true if total memory usage exceeded buffer_size. + // We stall the writes untill memory_usage drops below buffer_size. When the + // function returns true, all writer threads (including one checking this + // condition) across all DBs will be stalled. Stall is allowed only if user + // pass allow_stall = true during WriteBufferManager instance creation. + // + // Should only be called by RocksDB internally . + bool ShouldStall() { + if (allow_stall_ && enabled()) { + if (IsStallActive()) { + return true; + } + if (IsStallThresholdExceeded()) { + stall_active_.store(true, std::memory_order_relaxed); + return true; + } } + return false; + } + + // Returns true if stall is active. + bool IsStallActive() const { + return stall_active_.load(std::memory_order_relaxed); } + + // Returns true if stalling condition is met. + bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; } + + void ReserveMem(size_t mem); + // We are in the process of freeing `mem` bytes, so it is not considered // when checking the soft limit. - void ScheduleFreeMem(size_t mem) { - if (enabled()) { - memory_active_.fetch_sub(mem, std::memory_order_relaxed); - } - } - void FreeMem(size_t mem) { - if (cache_rep_ != nullptr) { - FreeMemWithCache(mem); - } else if (enabled()) { - memory_used_.fetch_sub(mem, std::memory_order_relaxed); - } - } + void ScheduleFreeMem(size_t mem); - void SetBufferSize(size_t new_size) { - buffer_size_.store(new_size, std::memory_order_relaxed); - mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); - } + void FreeMem(size_t mem); + + // Add the DB instance to the queue and block the DB. + // Should only be called by RocksDB internally. + void BeginWriteStall(StallInterface* wbm_stall); + + // Remove DB instances from queue and signal them to continue. + void EndWriteStall(); + + void RemoveDBFromQueue(StallInterface* wbm_stall); private: std::atomic buffer_size_; @@ -108,6 +166,11 @@ class WriteBufferManager { std::atomic dummy_size_; struct CacheRep; std::unique_ptr cache_rep_; + std::list queue_; + // Protects the queue_ + std::mutex mu_; + bool allow_stall_; + std::atomic stall_active_; void ReserveMemWithCache(size_t mem); void FreeMemWithCache(size_t mem); diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index f6451032a21..4547c20f840 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -8,7 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/write_buffer_manager.h" -#include + +#include "db/db_impl/db_impl.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { @@ -49,13 +50,16 @@ struct WriteBufferManager::CacheRep {}; #endif // ROCKSDB_LITE WriteBufferManager::WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache) + std::shared_ptr cache, + bool allow_stall) : buffer_size_(_buffer_size), mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), memory_active_(0), dummy_size_(0), - cache_rep_(nullptr) { + cache_rep_(nullptr), + allow_stall_(allow_stall), + stall_active_(false) { #ifndef ROCKSDB_LITE if (cache) { // Construct the cache key using the pointer to this. @@ -78,6 +82,17 @@ WriteBufferManager::~WriteBufferManager() { #endif // ROCKSDB_LITE } +void WriteBufferManager::ReserveMem(size_t mem) { + if (cache_rep_ != nullptr) { + ReserveMemWithCache(mem); + } else if (enabled()) { + memory_used_.fetch_add(mem, std::memory_order_relaxed); + } + if (enabled()) { + memory_active_.fetch_add(mem, std::memory_order_relaxed); + } +} + // Should only be called from write thread void WriteBufferManager::ReserveMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE @@ -112,6 +127,24 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) { #endif // ROCKSDB_LITE } +void WriteBufferManager::ScheduleFreeMem(size_t mem) { + if (enabled()) { + memory_active_.fetch_sub(mem, std::memory_order_relaxed); + } +} + +void WriteBufferManager::FreeMem(size_t mem) { + if (cache_rep_ != nullptr) { + FreeMemWithCache(mem); + } else if (enabled()) { + memory_used_.fetch_sub(mem, std::memory_order_relaxed); + } + // Check if stall is active and can be ended. + if (allow_stall_) { + EndWriteStall(); + } +} + void WriteBufferManager::FreeMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE assert(cache_rep_ != nullptr); @@ -145,4 +178,50 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { (void)mem; #endif // ROCKSDB_LITE } + +void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { + assert(wbm_stall != nullptr); + if (wbm_stall) { + std::unique_lock lock(mu_); + queue_.push_back(wbm_stall); + } + // In case thread enqueue itself and memory got freed in parallel, end the + // stall. + if (!ShouldStall()) { + EndWriteStall(); + } +} + +// Called when memory is freed in FreeMem. +void WriteBufferManager::EndWriteStall() { + if (enabled() && !IsStallThresholdExceeded()) { + { + std::unique_lock lock(mu_); + stall_active_.store(false, std::memory_order_relaxed); + if (queue_.empty()) { + return; + } + } + + // Get the instances from the list and call WBMStallInterface::Signal to + // change the state to running and unblock the DB instances. + // Check ShouldStall() incase stall got active by other DBs. + while (!ShouldStall() && !queue_.empty()) { + std::unique_lock lock(mu_); + StallInterface* wbm_stall = queue_.front(); + queue_.pop_front(); + wbm_stall->Signal(); + } + } +} + +void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) { + assert(wbm_stall != nullptr); + if (enabled() && allow_stall_) { + std::unique_lock lock(mu_); + queue_.remove(wbm_stall); + wbm_stall->Signal(); + } +} + } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 371db196b9f..a0c2e76e6d2 100644 --- a/src.mk +++ b/src.mk @@ -424,6 +424,7 @@ TEST_MAIN_SOURCES = \ db/db_universal_compaction_test.cc \ db/db_wal_test.cc \ db/db_with_timestamp_compaction_test.cc \ + db/db_write_buffer_manager_test.cc \ db/db_write_test.cc \ db/dbformat_test.cc \ db/deletefile_test.cc \