-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_pool.hh
140 lines (108 loc) · 3.74 KB
/
thread_pool.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#ifndef UTIL_THREAD_POOL_H
#define UTIL_THREAD_POOL_H
#include "util/pcqueue.hh"
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <cstdlib>
namespace util {
template <class HandlerT> class Worker : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison)
: in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {}
// Only call from thread.
void operator()() {
Request request;
while (1) {
in_.Consume(request);
if (request == poison_) return;
try {
(*handler_)(request);
}
catch(const std::exception &e) {
std::cerr << "Handler threw " << e.what() << std::endl;
abort();
}
catch(...) {
std::cerr << "Handler threw an exception, dropping request" << std::endl;
abort();
}
}
}
void Join() {
thread_.join();
}
private:
PCQueue<Request> &in_;
boost::optional<Handler> handler_;
const Request poison_;
boost::thread thread_;
};
template <class HandlerT> class ThreadPool : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
template <class Construct> ThreadPool(std::size_t queue_length, std::size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
for (size_t i = 0; i < workers; ++i) {
workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
}
}
~ThreadPool() {
for (std::size_t i = 0; i < workers_.size(); ++i) {
Produce(poison_);
}
for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
i->Join();
}
}
void Produce(const Request &request) {
in_.Produce(request);
}
// For adding to the queue.
PCQueue<Request> &In() { return in_; }
private:
PCQueue<Request> in_;
boost::ptr_vector<Worker<Handler> > workers_;
Request poison_;
};
template <class Handler> class RecyclingHandler {
public:
typedef typename Handler::Request Request;
template <class Construct> RecyclingHandler(PCQueue<Request> &recycling, Construct &handler_construct)
: inner_(handler_construct), recycling_(recycling) {}
void operator()(Request &request) {
inner_(request);
recycling_.Produce(request);
}
private:
Handler inner_;
PCQueue<Request> &recycling_;
};
template <class HandlerT> class RecyclingThreadPool : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
// Remember to call PopulateRecycling afterwards in most cases.
template <class Construct> RecyclingThreadPool(std::size_t queue, std::size_t workers, Construct handler_construct, Request poison)
: recycling_(queue), pool_(queue, workers, RecyclingHandler<Handler>(recycling_, handler_construct), poison) {}
// Initialization: put stuff into the recycling queue. This could also be
// done by calling Produce without Consume, but it's often easier to
// initialize with PopulateRecycling then do a Consume/Produce loop.
void PopulateRecycling(const Request &request) {
recycling_.Produce(request);
}
Request Consume() {
return recycling_.Consume();
}
void Produce(const Request &request) {
pool_.Produce(request);
}
private:
PCQueue<Request> recycling_;
ThreadPool<RecyclingHandler<Handler> > pool_;
};
} // namespace util
#endif // UTIL_THREAD_POOL_H