Skip to content

Commit

Permalink
Use generic threadpool for Windows environment (facebook#1120)
Browse files Browse the repository at this point in the history
Conditionally retrofit thread_posix for use with std::thread
  and reuse the same logic. Posix users continue using Posix interfaces.
  Enable XPRESS compression in test runs.
  Fix master introduced signed/unsigned mismatch.
  • Loading branch information
yuslepukhin authored and ajkr committed May 13, 2016
1 parent a400336 commit aab91b8
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 292 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ set(SOURCES
util/testharness.cc
util/testutil.cc
util/thread_local.cc
util/threadpool.cc
util/thread_status_impl.cc
util/thread_status_updater.cc
util/thread_status_util.cc
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: 1.0.{build}
before_build:
- md %APPVEYOR_BUILD_FOLDER%\build
- cd %APPVEYOR_BUILD_FOLDER%\build
- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 ..
- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 -DXPRESS=1 ..
- cd ..
build:
project: build\rocksdb.sln
parallel: true
verbosity: minimal
test:
test_script:
- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Exclude DBTest.Randomized,DBTest.FileCreationRandomFailure -Concurrency 18
- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Concurrency 10
- ps: build_tools\run_ci_db_test.ps1 -Run env_test -Concurrency 1

252 changes: 1 addition & 251 deletions port/win/env_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "util/sync_point.h"
#include "util/aligned_buffer.h"

#include "util/threadpool.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"

Expand Down Expand Up @@ -1834,257 +1835,6 @@ class WinEnv : public Env {

bool SupportsFastAllocate(const std::string& /* path */) { return false; }

class ThreadPool {
public:
ThreadPool()
: total_threads_limit_(1),
bgthreads_(0),
queue_(),
queue_len_(0U),
exit_all_threads_(false),
low_io_priority_(false),
env_(nullptr) {}

~ThreadPool() { assert(bgthreads_.size() == 0U); }

void JoinAllThreads() {
{
std::lock_guard<std::mutex> lock(mu_);
assert(!exit_all_threads_);
exit_all_threads_ = true;
bgsignal_.notify_all();
}

for (std::thread& th : bgthreads_) {
th.join();
}

// Subject to assert in the __dtor
bgthreads_.clear();
}

void SetHostEnv(Env* env) { env_ = env; }

// Return true if there is at least one thread needs to terminate.
bool HasExcessiveThread() const {
return bgthreads_.size() > total_threads_limit_;
}

// Return true iff the current thread is the excessive thread to terminate.
// Always terminate the running thread that is added last, even if there are
// more than one thread to terminate.
bool IsLastExcessiveThread(size_t thread_id) const {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}

// Is one of the threads to terminate.
bool IsExcessiveThread(size_t thread_id) const {
return thread_id >= total_threads_limit_;
}

// Return the thread priority.
// This would allow its member-thread to know its priority.
Env::Priority GetThreadPriority() { return priority_; }

// Set the thread priority.
void SetThreadPriority(Env::Priority priority) { priority_ = priority; }

void BGThread(size_t thread_id) {
while (true) {
// Wait until there is an item that is ready to run
std::unique_lock<std::mutex> uniqueLock(mu_);

// Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) {
bgsignal_.wait(uniqueLock);
}

if (exit_all_threads_) {
// mechanism to let BG threads exit safely
uniqueLock.unlock();
break;
}

if (IsLastExcessiveThread(thread_id)) {
// Current thread is the last generated one and is excessive.
// We always terminate excessive thread in the reverse order of
// generation time.
std::thread& terminating_thread = bgthreads_.back();
auto tid = terminating_thread.get_id();
// Ensure that this thread is ours
assert(tid == std::this_thread::get_id());
terminating_thread.detach();
bgthreads_.pop_back();

if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate.
WakeUpAllThreads();
}

uniqueLock.unlock();

PrintThreadInfo(thread_id, gettid());
break;
}

void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
queue_len_.store(queue_.size(), std::memory_order_relaxed);

uniqueLock.unlock();
(*function)(arg);
}
}

// Helper struct for passing arguments when creating threads.
struct BGThreadMetadata {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.

BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};

static void* BGThreadWrapper(void* arg) {
std::unique_ptr<BGThreadMetadata> meta(
reinterpret_cast<BGThreadMetadata*>(arg));

size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;

#if ROCKSDB_USING_THREAD_STATUS
// for thread-status
ThreadStatusUtil::RegisterThread(
tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH
? ThreadStatus::HIGH_PRIORITY
: ThreadStatus::LOW_PRIORITY));
#endif
tp->BGThread(thread_id);
#if ROCKSDB_USING_THREAD_STATUS
ThreadStatusUtil::UnregisterThread();
#endif
return nullptr;
}

void WakeUpAllThreads() { bgsignal_.notify_all(); }

void SetBackgroundThreadsInternal(size_t num, bool allow_reduce) {
std::lock_guard<std::mutex> lg(mu_);

if (exit_all_threads_) {
return;
}

if (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce)) {
total_threads_limit_ = std::max(size_t(1), num);
WakeUpAllThreads();
StartBGThreads();
}
assert(total_threads_limit_ > 0);
}

void IncBackgroundThreadsIfNeeded(int num) {
SetBackgroundThreadsInternal(num, false);
}

void SetBackgroundThreads(int num) {
SetBackgroundThreadsInternal(num, true);
}

void StartBGThreads() {
// Start background thread if necessary
while (bgthreads_.size() < total_threads_limit_) {
std::thread p_t(&ThreadPool::BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size()));
bgthreads_.push_back(std::move(p_t));
}
}

void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)) {
std::lock_guard<std::mutex> lg(mu_);

if (exit_all_threads_) {
return;
}

StartBGThreads();

// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
queue_.back().tag = tag;
queue_.back().unschedFunction = unschedFunction;
queue_len_.store(queue_.size(), std::memory_order_relaxed);

if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
bgsignal_.notify_one();
} else {
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
}

int UnSchedule(void* arg) {
int count = 0;

std::lock_guard<std::mutex> lg(mu_);

// Remove from priority queue
BGQueue::iterator it = queue_.begin();
while (it != queue_.end()) {
if (arg == (*it).tag) {
void (*unschedFunction)(void*) = (*it).unschedFunction;
void* arg1 = (*it).arg;
if (unschedFunction != nullptr) {
(*unschedFunction)(arg1);
}
it = queue_.erase(it);
count++;
} else {
++it;
}
}

queue_len_.store(queue_.size(), std::memory_order_relaxed);

return count;
}

unsigned int GetQueueLen() const {
return static_cast<unsigned int>(
queue_len_.load(std::memory_order_relaxed));
}

private:
// Entry per Schedule() call
struct BGItem {
void* arg;
void (*function)(void*);
void* tag;
void (*unschedFunction)(void*);
};

typedef std::deque<BGItem> BGQueue;

std::mutex mu_;
std::condition_variable bgsignal_;
size_t total_threads_limit_;
std::vector<std::thread> bgthreads_;
BGQueue queue_;
std::atomic_size_t queue_len_; // Queue length. Used for stats reporting
bool exit_all_threads_;
bool low_io_priority_;
Env::Priority priority_;
Env* env_;
};

bool checkedDiskForMmap_;
bool forceMmapOff; // do we override Env options?
size_t page_size_;
Expand Down
2 changes: 1 addition & 1 deletion src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ LIB_SOURCES = \
util/env_hdfs.cc \
util/env_posix.cc \
util/io_posix.cc \
util/thread_posix.cc \
util/threadpool.cc \
util/transaction_test_util.cc \
util/sst_file_manager_impl.cc \
util/file_util.cc \
Expand Down
2 changes: 1 addition & 1 deletion util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include "rocksdb/slice.h"
#include "util/coding.h"
#include "util/io_posix.h"
#include "util/thread_posix.h"
#include "util/threadpool.h"
#include "util/iostats_context_imp.h"
#include "util/logging.h"
#include "util/posix_logger.h"
Expand Down
Loading

0 comments on commit aab91b8

Please sign in to comment.