Skip to content
This repository has been archived by the owner on May 13, 2018. It is now read-only.

Commit

Permalink
Add another post example.
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskohlhoff committed Sep 8, 2014
1 parent e91abff commit 7a03412
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/examples/executor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fork_join
pipeline
post_1
post_2
post_3
priority_scheduler
sort_1
sort_2
Expand Down
2 changes: 2 additions & 0 deletions src/examples/executor/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ noinst_PROGRAMS = \
pipeline \
post_1 \
post_2 \
post_3 \
priority_scheduler \
sort_1 \
sort_2 \
Expand All @@ -40,6 +41,7 @@ fork_join_SOURCES = fork_join.cpp
pipeline_SOURCES = pipeline.cpp
post_1_SOURCES = post_1.cpp
post_2_SOURCES = post_2.cpp
post_3_SOURCES = post_3.cpp
priority_scheduler_SOURCES = priority_scheduler.cpp
sort_1_SOURCES = sort_1.cpp
sort_2_SOURCES = sort_2.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/examples/executor/post_1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int main(int argc, char* argv[])

std::vector<work> work_list(std::atoi(argv[1]));

// Spawn a bunch of asynchronous tasks and wait for them to complete.
// Start a bunch of asynchronous tasks and wait for them to complete.
latch l(work_list.size());
for (auto work: work_list)
{
Expand Down
2 changes: 1 addition & 1 deletion src/examples/executor/post_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int main(int argc, char* argv[])

std::vector<work> work_list(std::atoi(argv[1]));

// Spawn a large amount of work and join on the pool to complete.
// Start a large amount of work and join on the pool to complete.
thread_pool tp(16);
for (auto work: work_list)
{
Expand Down
150 changes: 150 additions & 0 deletions src/examples/executor/post_3.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <algorithm>
#include <cassert>
#include <cctype>
#include <condition_variable>
#include <cstdlib>
#include <experimental/executor>
#include <experimental/strand>
#include <experimental/thread_pool>
#include <fstream>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

using std::experimental::executor;
using std::experimental::executor_work;
using std::experimental::make_work;
using std::experimental::post;
using std::experimental::strand;
using std::experimental::system_executor;
using std::experimental::thread_pool;
using std::experimental::wrap;

class latch
{
public:
explicit latch(std::size_t initial_count)
: count_(initial_count)
{
}

void arrive()
{
std::unique_lock<std::mutex> lock(mutex_);
assert(count_ > 0);
if (--count_ == 0)
condition_.notify_all();
}

void wait()
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return count_ == 0; });
}

private:
std::mutex mutex_;
std::condition_variable condition_;
std::size_t count_;
};

class notifying_latch
{
public:
template <class Handler>
explicit notifying_latch(std::size_t initial_count, Handler handler)
: count_(initial_count),
handler_([work=make_work(handler), handler]{
post(work.get_executor(), handler);
})
{
}

void arrive()
{
std::unique_lock<std::mutex> lock(mutex_);
assert(count_ > 0);
if (--count_ == 0)
{
handler_();
handler_ = nullptr;
}
}

private:
std::mutex mutex_;
std::size_t count_;
std::function<void()> handler_;
};

std::vector<char> read_file(const std::string& filename)
{
std::ifstream file(filename.c_str());
std::istreambuf_iterator<char> iter(file.rdbuf()), end;
return std::vector<char>(iter, end);
}

std::vector<char> process_content(std::vector<char> content)
{
for (char& c: content)
c = std::toupper(c);
return std::move(content);
}

void append_content(std::vector<char>& dest, std::vector<char> source)
{
dest.insert(dest.end(), source.begin(), source.end());
}

void render_content(const std::vector<char>& content)
{
std::ostreambuf_iterator<char> iter(std::cout.rdbuf());
std::copy(content.begin(), content.end(), iter);
}

int main(int argc, char* argv[])
{
if (argc < 2)
{
std::cerr << "Usage: post_3 <files...>\n";
return 1;
}

std::vector<std::string> src_files;
for (int i = 1; i < argc; ++i)
src_files.push_back(argv[i]);

thread_pool file_pool(8);
std::vector<std::vector<char>> content_results;
latch done(src_files.size());
for (auto src: src_files)
{
// Post file read on file_pool and content processing on system_executor.
post(file_pool, [src, &content_results, &done]{
post([content=read_file(src), &content_results, &done]{
auto result = process_content(std::move(content));
content_results.push_back(std::move(result));
done.arrive();
});
});
}
done.wait();

std::vector<char> out;
strand<system_executor> merge_executor;
thread_pool ui_executor(1);
notifying_latch nl(content_results.size(),
wrap(ui_executor, [&out]{ render_content(out); }));
for (auto& result: content_results)
{
post(merge_executor, [&out, &result, &nl]{
append_content(out, result);
nl.arrive();
});
}
ui_executor.join();
}

0 comments on commit 7a03412

Please sign in to comment.