Skip to content

Commit

Permalink
Merge pull request wangzheng0822#45 from Liam0205/09_queue_concurrency
Browse files Browse the repository at this point in the history
[C++][09_queue] 并发相关队列实现
  • Loading branch information
wangzheng0822 authored Oct 12, 2018
2 parents baa4a21 + 038359e commit 0801b5b
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ hs_err_pid*

# editor files
.vscode
.*.swp
82 changes: 82 additions & 0 deletions c-cpp/09_queue/block_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Created by Liam Huang (Liam0205) on 2018/10/11.
*/

#ifndef QUEUE_BLOCK_QUEUE_HPP_
#define QUEUE_BLOCK_QUEUE_HPP_

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class BlockQueue {
public:
using value_type = T;
using container_type = std::queue<value_type>;
using size_type = typename container_type::size_type;

private:
size_type capacity_ = 0;
container_type container_;
mutable std::mutex mutex_;
mutable std::condition_variable not_empty_;
mutable std::condition_variable not_full_;

public:
BlockQueue() = delete;
BlockQueue(const size_type capacity) : capacity_(capacity) {}
BlockQueue(const BlockQueue&) = default;
BlockQueue(BlockQueue&&) = default;
BlockQueue& operator=(const BlockQueue&) = default;
BlockQueue& operator=(BlockQueue&&) = default;

private:
bool empty() const { return container_.empty(); }
bool full() const { return not(container_.size() < capacity_); }

public:
void put(const value_type& item) {
std::unqiue_lock<std::mutex> lock(mutex_);
while (full()) {
not_full_.wait(lock);
}
container_.push(item);
not_empty_.notify_one();
}
void take(value_type& out) {
std::unique_lock<std::mutex> lock(mutex_);
while (empty()) {
not_empty_.wait(lock);
}
out = container_.front();
container_.pop();
not_full_.notify_one();
}
template <typename Duration>
bool put_for(const value_type& item, const Duration& d) {
std::unqiue_lock<std::mutex> lock(mutex_);
if (not_full_.wait_for(lock, d, [&](){ return not full(); })) {
container_.push(item);
not_empty_.notify_one();
return true;
} else {
return false;
}
}
template <typename Duration>
bool take_for(const Duration& d, value_type& out) {
std::unique_lock<std::mutex> lock(mutex_);
if (not_empty_.wait_for(lock, d, [&](){ return not empty(); })) {
out = container_.front();
container_.pop();
not_full_.notify_one();
return true;
} else {
return false;
}
}
};

#endif // QUEUE_BLOCK_QUEUE_HPP_

85 changes: 85 additions & 0 deletions c-cpp/09_queue/concurrency_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Created by Liam Huang (Liam0205) on 2018/10/11.
*/

#ifndef QUEUE_CONCURRENCY_QUEUE_HPP_
#define QUEUE_CONCURRENCY_QUEUE_HPP_

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template <typename T>
class ConcurrencyQueue {
public:
using value_type = T;
using container_type = std::queue<value_type>;
using size_type = typename container_type::size_type;

private:
container_type container_;
mutable std::mutex mutex_;
std::condition_variable container_cond_;

public:
ConcurrencyQueue() = default;
ConcurrencyQueue(const ConcurrencyQueue&) = default;
ConcurrencyQueue(ConcurrencyQueue&&) = default;
ConcurrencyQueue& operator=(const ConcurrencyQueue&) = default;
ConcurrencyQueue& operator=(ConcurrencyQueue&&) = default;

private:
bool empty_() const { return container_.empty(); }

public:
bool empty() const {
std::lock_guard<std::mutex> lg(mutex_);
return container_.empty();
}
void push(value_type item) {
std::lock_guard<std::mutex> lg(mutex_);
container_.push(std::move(item));
container_cond_.notify_one();
}
void wait_and_pop(value_type& out) {
std::unique_lock<std::mutex> lk(mutex_);
while (empty_()) {
container_cond_.wait(lk)
}
out = std::move(container_.front());
container_.pop();
}
std::shared_ptr<value_type> wait_and_pop() {
std::unique_lock<std::mutex> lk(mutex_);
while (empty_()) {
container_cond_.wait(lk)
}
auto res = std::make_shared<value_type>(std::move(container_.front()));
container_.pop();
return res;
}
bool try_pop(value_type& out) {
std::lock_guard<std::mutex> lg(mutex_);
if (empty_()) {
return false;
} else {
out = std::move(container_.front());
container_.pop();
return true;
}
}
std::shared_ptr<value_type> try_pop() {
std::lock_guard<std::mutex> lg(mutex_);
if (empty_()) {
return nullptr;
} else {
auto res = std::make_shared<value_type>(std::move(container_.front()));
container_.pop();
return res;
}
}
};

#endif // QUEUE_CONCURRENCY_QUEUE_HPP_

84 changes: 84 additions & 0 deletions c-cpp/09_queue/lock_free_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Created by Liam Huang (Liam0205) on 2018/10/11.
*/

#ifndef QUEUE_LOCK_FREE_QUEUE_HPP_
#define QUEUE_LOCK_FREE_QUEUE_HPP_

#include <memory>
#include <atomic>

template <typename T>
class LockFreeQueue {
public:
using value_type = T;

private:
struct node {
std::shared<value_type> data = nullptr;
node* next = nullptr;
};
std::atomic<node*> head = nullptr;
std::atomic<node*> tail = nullptr;

public:
LockFreeQueue() head(new node), tail(head.load()) {}
LockFreeQueue(const LockFreeQueue&) = delete;
LockFreeQueue(LockFreeQueue&& other) : head(other.head.load()), tail(other.tail.load()) {
other.head.store(nullptr);
other.tail.store(nullptr);
}
LockFreeQueue& operator=(const LockFreeQueue&) = delete;
LockFreeQueue& operator=(LockFreeQueue&& rhs) {
while (node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
head.store(rhs.head.load());
tail.store(rhs.tail.load());
rhs.head.store(nullptr);
rhs.tail.store(nullptr);
}
~LockFreeQueue() {
while (node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}

private:
node* pop_head() {
node* const res = head.load();
if (res == tail.load()) {
return nullptr;
}
head.store(res->next);
return res;
}

public:
bool empty() const {
return head.load() == tail.load();
}
std::shared_ptr<value_type> pop() {
node* old_head = pop_head();
if (nullptr == old_head) {
return nullptr;
} else {
auto res = old_head->data;
delete old_head;
return res;
}
}
void push(value_type new_value) {
auto new_data = std::make_shared<value_type>(new_value);
node* p = new node;
node* old_tail = tail.load();
old_tail->data.swap(new_data);
old_tail->next = p;
tail_.store(p);
}
};

#endif // QUEUE_LOCK_FREE_QUEUE_HPP_

0 comments on commit 0801b5b

Please sign in to comment.