Skip to content

Commit

Permalink
Add a C++11 ThreadPool implementation in LLVM
Browse files Browse the repository at this point in the history
This is a very simple implementation of a thread pool using C++11
thread. It accepts any std::function<void()> for asynchronous
execution. Individual task can be synchronize using the returned
future, or the client can block on the full queue completion.

In case LLVM is configured with Threading disabled, it falls back
to sequential execution using std::async with launch:deferred.

This is intended to support parallelism for ThinLTO processing in
linker plugin, but is generic enough for any other uses.

This is a recommit of r255444 ; trying to workaround a bug in the
MSVC 2013 standard library. I think I was hit by:

 http://connect.microsoft.com/VisualStudio/feedbackdetail/view/791185/std-packaged-task-t-where-t-is-void-or-a-reference-class-are-not-movable

Recommit of r255589, trying to please g++ as well.

Differential Revision: http://reviews.llvm.org/D15464

From: mehdi_amini <mehdi_amini@91177308-0d34-0410-b5e6-96231b3b80d8>

git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@255593 91177308-0d34-0410-b5e6-96231b3b80d8
  • Loading branch information
joker-eph committed Dec 15, 2015
1 parent 8d176bb commit abb30d1
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 0 deletions.
115 changes: 115 additions & 0 deletions include/llvm/Support/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
//
// This file defines a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//

#ifndef LLVM_SUPPORT_THREAD_POOL_H
#define LLVM_SUPPORT_THREAD_POOL_H

#include "llvm/Support/thread.h"

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <utility>

namespace llvm {

/// A ThreadPool for asynchronous parallel execution on a defined number of
/// threads.
///
/// The pool keeps a vector of threads alive, waiting on a condition variable
/// for some work to become available.
class ThreadPool {
public:
#ifndef _MSC_VER
using VoidTy = void;
using TaskTy = std::function<void()>;
using PackagedTaskTy = std::packaged_task<void()>;
#else
// MSVC 2013 has a bug and can't use std::packaged_task<void()>;
// We force it to use bool(bool) instead.
using VoidTy = bool;
using TaskTy = std::function<bool(bool)>;
using PackagedTaskTy = std::packaged_task<bool(bool)>;
#endif

/// Construct a pool with the number of core available on the system (or
/// whatever the value returned by std::thread::hardware_concurrency() is).
ThreadPool();

/// Construct a pool of \p ThreadCount threads
ThreadPool(unsigned ThreadCount);

/// Blocking destructor: the pool will wait for all the threads to complete.
~ThreadPool();

/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function, typename... Args>
inline std::shared_future<VoidTy> async(Function &&F, Args &&... ArgList) {
auto Task =
std::bind(std::forward<Function>(F), std::forward<Args...>(ArgList...));
#ifndef _MSC_VER
return asyncImpl(std::move(Task));
#else
return asyncImpl([Task] (VoidTy) -> VoidTy { Task(); return VoidTy(); });
#endif
}

/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function>
inline std::shared_future<VoidTy> async(Function &&F) {
#ifndef _MSC_VER
return asyncImpl(std::forward<Function>(F));
#else
return asyncImpl([F] (VoidTy) -> VoidTy { F(); return VoidTy(); });
#endif
}

/// Blocking wait for all the threads to complete and the queue to be empty.
/// It is an error to try to add new tasks while blocking on this call.
void wait();

private:
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
std::shared_future<VoidTy> asyncImpl(TaskTy F);

/// Threads in flight
std::vector<llvm::thread> Threads;

/// Tasks waiting for execution in the pool.
std::queue<PackagedTaskTy> Tasks;

/// Locking and signaling for accessing the Tasks queue.
std::mutex QueueLock;
std::condition_variable QueueCondition;

/// Locking and signaling for job completion
std::mutex CompletionLock;
std::condition_variable CompletionCondition;

/// Keep track of the number of thread actually busy
std::atomic<unsigned> ActiveThreads;

#if LLVM_ENABLE_THREADS // avoids warning for unused variable
/// Signal for the destruction of the pool, asking thread to exit.
bool EnableFlag;
#endif
};
}

#endif // LLVM_SUPPORT_THREAD_POOL_H
2 changes: 2 additions & 0 deletions include/llvm/Support/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ typedef std::thread thread;

#else // !LLVM_ENABLE_THREADS

#include <utility>

namespace llvm {

struct thread {
Expand Down
1 change: 1 addition & 0 deletions lib/Support/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ add_llvm_library(LLVMSupport
StringRef.cpp
SystemUtils.cpp
TargetParser.cpp
ThreadPool.cpp
Timer.cpp
ToolOutputFile.cpp
Triple.cpp
Expand Down
146 changes: 146 additions & 0 deletions lib/Support/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
//
// This file implements a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//

#include "llvm/Support/ThreadPool.h"

#include "llvm/Config/llvm-config.h"
#include "llvm/Support/raw_ostream.h"

using namespace llvm;

#if LLVM_ENABLE_THREADS

// Default to std::thread::hardware_concurrency
ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {}

ThreadPool::ThreadPool(unsigned ThreadCount)
: ActiveThreads(0), EnableFlag(true) {
// Create ThreadCount threads that will loop forever, wait on QueueCondition
// for tasks to be queued or the Pool to be destroyed.
Threads.reserve(ThreadCount);
for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
Threads.emplace_back([&] {
while (true) {
PackagedTaskTy Task;
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Wait for tasks to be pushed in the queue
QueueCondition.wait(LockGuard,
[&] { return !EnableFlag || !Tasks.empty(); });
// Exit condition
if (!EnableFlag && Tasks.empty())
return;
// Yeah, we have a task, grab it and release the lock on the queue

// We first need to signal that we are active before popping the queue
// in order for wait() to properly detect that even if the queue is
// empty, there is still a task in flight.
{
++ActiveThreads;
std::unique_lock<std::mutex> LockGuard(CompletionLock);
}
Task = std::move(Tasks.front());
Tasks.pop();
}
// Run the task we just grabbed
#ifndef _MSC_VER
Task();
#else
Task(/* unused */ false);
#endif

{
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
std::unique_lock<std::mutex> LockGuard(CompletionLock);
--ActiveThreads;
}

// Notify task completion, in case someone waits on ThreadPool::wait()
CompletionCondition.notify_all();
}
});
}
}

void ThreadPool::wait() {
// Wait for all threads to complete and the queue to be empty
std::unique_lock<std::mutex> LockGuard(CompletionLock);
CompletionCondition.wait(LockGuard,
[&] { return Tasks.empty() && !ActiveThreads; });
}

std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
/// Wrap the Task in a packaged_task to return a future object.
PackagedTaskTy PackagedTask(std::move(Task));
auto Future = PackagedTask.get_future();
{
// Lock the queue and push the new task
std::unique_lock<std::mutex> LockGuard(QueueLock);

// Don't allow enqueueing after disabling the pool
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");

Tasks.push(std::move(PackagedTask));
}
QueueCondition.notify_one();
return Future.share();
}

// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
EnableFlag = false;
}
QueueCondition.notify_all();
for (auto &Worker : Threads)
Worker.join();
}

#else // LLVM_ENABLE_THREADS Disabled

ThreadPool::ThreadPool() : ThreadPool(0) {}

// No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool::ThreadPool(unsigned ThreadCount)
: ActiveThreads(0) {
if (ThreadCount) {
errs() << "Warning: request a ThreadPool with " << ThreadCount
<< " threads, but LLVM_ENABLE_THREADS has been turned off\n";
}
}

void ThreadPool::wait() {
// Sequential implementation running the tasks
while (!Tasks.empty()) {
auto Task = std::move(Tasks.front());
Tasks.pop();
Task();
}
}

std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
// Get a Future with launch::deferred execution using std::async
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
// Wrap the future so that both ThreadPool::wait() can operate and the
// returned future can be sync'ed on.
PackagedTaskTy PackagedTask([Future]() { Future.get(); });
Tasks.push(std::move(PackagedTask));
return Future;
}

ThreadPool::~ThreadPool() {
wait();
}

#endif
1 change: 1 addition & 0 deletions unittests/Support/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ add_llvm_unittest(SupportTests
SwapByteOrderTest.cpp
TargetRegistry.cpp
ThreadLocalTest.cpp
ThreadPool.cpp
TimeValueTest.cpp
TrailingObjectsTest.cpp
UnicodeTest.cpp
Expand Down
91 changes: 91 additions & 0 deletions unittests/Support/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//

#include "llvm/Support/ThreadPool.h"

#include "llvm/ADT/STLExtras.h"

#include "gtest/gtest.h"

using namespace llvm;
using namespace std::chrono;

/// Try best to make this thread not progress faster than the main thread
static void yield() {
#ifdef LLVM_ENABLE_THREADS
std::this_thread::yield();
#endif
std::this_thread::sleep_for(milliseconds(200));
#ifdef LLVM_ENABLE_THREADS
std::this_thread::yield();
#endif
}

TEST(ThreadPoolTest, AsyncBarrier) {
// test that async & barrier work together properly.

std::atomic_int checked_in{0};

ThreadPool Pool;
for (size_t i = 0; i < 5; ++i) {
Pool.async([&checked_in, i] {
yield();
++checked_in;
});
}
ASSERT_EQ(0, checked_in);
Pool.wait();
ASSERT_EQ(5, checked_in);
}

TEST(ThreadPoolTest, Async) {
ThreadPool Pool;
std::atomic_int i{0};
// sleep here just to ensure that the not-equal is correct.
Pool.async([&i] {
yield();
++i;
});
Pool.async([&i] { ++i; });
ASSERT_NE(2, i.load());
Pool.wait();
ASSERT_EQ(2, i.load());
}

TEST(ThreadPoolTest, GetFuture) {
ThreadPool Pool;
std::atomic_int i{0};
// sleep here just to ensure that the not-equal is correct.
Pool.async([&i] {
yield();
++i;
});
// Force the future using get()
Pool.async([&i] { ++i; }).get();
ASSERT_NE(2, i.load());
Pool.wait();
ASSERT_EQ(2, i.load());
}

TEST(ThreadPoolTest, PoolDestruction) {
// Test that we are waiting on destruction
std::atomic_int checked_in{0};

{
ThreadPool Pool;
for (size_t i = 0; i < 5; ++i) {
Pool.async([&checked_in, i] {
yield();
++checked_in;
});
}
ASSERT_EQ(0, checked_in);
}
ASSERT_EQ(5, checked_in);
}

0 comments on commit abb30d1

Please sign in to comment.