Skip to content

Commit

Permalink
Merge pull request #24 from dmlc/concurrency
Browse files Browse the repository at this point in the history
Concurrency
  • Loading branch information
mli committed Jul 21, 2015
2 parents 436720f + d410533 commit 53f5a9a
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 6 deletions.
158 changes: 158 additions & 0 deletions include/dmlc/concurrency.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*!
* Copyright (c) 2015 by Contributors
* \file concurrency.h
* \brief thread-safe data structures.
* \author Yutian Li
*/
#ifndef DMLC_CONCURRENCY_H_
#define DMLC_CONCURRENCY_H_
// this code depends on c++11
#if DMLC_USE_CXX11
#include <atomic>
#include <list>
#include <mutex>
#include <condition_variable>

namespace dmlc {

/*!
* \brief Simple userspace spinlock implementation.
*/
class Spinlock {
public:
Spinlock() = default;
/*!
* \brief Disable copy and move.
*/
Spinlock(Spinlock const&) = delete;
Spinlock(Spinlock&&) = delete;
Spinlock& operator=(Spinlock const&) = delete;
Spinlock& operator=(Spinlock&&) = delete;
~Spinlock() = default;
/*!
* \brief Acquire lock.
*/
inline void lock() noexcept;
/*!
* \brief Release lock.
*/
inline void unlock() noexcept;

private:
std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
};

/*!
* \brief Cocurrent blocking queue.
*/
template<typename T>
class ConcurrentBlockingQueue {
public:
ConcurrentBlockingQueue() = default;
/*!
* \brief Disable copy and move.
*/
ConcurrentBlockingQueue(ConcurrentBlockingQueue const&) = delete;
ConcurrentBlockingQueue(ConcurrentBlockingQueue&&) = delete;
ConcurrentBlockingQueue& operator=(ConcurrentBlockingQueue const&) = delete;
ConcurrentBlockingQueue& operator=(ConcurrentBlockingQueue&&) = delete;
~ConcurrentBlockingQueue() = default;
/*!
* \brief Push element into the queue.
* It will copy or move the element into the queue, depending on the type of the parameter.
* \param e Element to push into.
*/
template<typename E> void Push(E&& e);
/*!
* \brief Pop element from the queue.
* The element will be copied or moved into the object passed in.
* \param e Element popped.
* \return Whether the queue is not empty afterwards.
*/
bool Pop(T* rv);
/*!
* \brief Pop everything.
* \return The queue.
*/
std::list<T> PopAll();
/*!
* \brief Signal the queue for destruction.
* After calling this method, all blocking pop call to the queue will return false.
*/
void SignalForKill();
/*!
* \brief Get the size of the queue.
* \return The size of the queue.
*/
size_t Size();

private:
std::mutex mutex_;
std::condition_variable cv_;
std::atomic<bool> exit_now_{false};
std::list<T> queue_;
};

inline void Spinlock::lock() noexcept {
while (lock_.test_and_set(std::memory_order_acquire)) {
}
}

inline void Spinlock::unlock() noexcept {
lock_.clear(std::memory_order_release);
}

template<typename T>
template<typename E>
void ConcurrentBlockingQueue<T>::Push(E&& e) {
static_assert(
std::is_same<typename std::remove_cv<
typename std::remove_reference<E>::type>::type, T>::value,
"Types must match.");
std::lock_guard<std::mutex> lock{mutex_};
queue_.emplace_back(std::forward<E>(e));
if (queue_.size() == 1) {
cv_.notify_all();
}
}

template<typename T>
bool ConcurrentBlockingQueue<T>::Pop(T* rv) {
std::unique_lock<std::mutex> lock{mutex_};
while (queue_.empty() && !exit_now_.load()) {
cv_.wait(lock);
}
if (!exit_now_.load()) {
*rv = std::move(queue_.front());
queue_.pop_front();
return true;
} else {
return false;
}
}

template<typename T>
std::list<T> ConcurrentBlockingQueue<T>::PopAll() {
std::lock_guard<std::mutex> lock{mutex_};
std::list<T> rv;
rv.swap(queue_);
return rv;
}

template<typename T>
void ConcurrentBlockingQueue<T>::SignalForKill() {
std::unique_lock<std::mutex> lock{mutex_};
exit_now_.store(true);
cv_.notify_all();
}

template<typename T>
size_t ConcurrentBlockingQueue<T>::Size() {
std::unique_lock<std::mutex> lock{mutex_};
return queue_.size();
}

} // namespace dmlc

#endif // DMLC_USE_CXX11
#endif // DMLC_CONCURRENCY_H_
6 changes: 3 additions & 3 deletions include/dmlc/omp.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
#else
#ifndef DISABLE_OPENMP
// use pragma message instead of warning
#pragma message("Warning: OpenMP is not available," \
"project will be compiled into single-thread code." \
"Use OpenMP-enabled compiler to get benefit of multi-threading")
#pragma message("Warning: OpenMP is not available, " \
"project will be compiled into single-thread code. " \
"Use OpenMP-enabled compiler to get benefit of multi-threading.")
#endif
//! \cond Doxygen_Suppress
inline int omp_get_thread_num() { return 0; }
Expand Down
9 changes: 6 additions & 3 deletions make/dmlc.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ endif
ifndef NO_OPENMP
DMLC_CFLAGS += -fopenmp
DMLC_LDFLAGS += -fopenmp
DMLC_LDFLAGS += -lrt
else
DMLC_LDFLAGS += -lrt # there is no -lrt on mac os x
endif

# Mac OS X does not support "-lrt" flag
OS := $(shell uname -s)
ifneq ($(OS), Darwin)
DMLC_LDFLAGS += -lrt
endif

# handle fpic options
Expand Down

0 comments on commit 53f5a9a

Please sign in to comment.