-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathIThreadPool.cpp
132 lines (122 loc) · 4.21 KB
/
IThreadPool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* IThreadPool.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/IThreadPool.h"
#include <algorithm>
// The ifndef's allow us to compile with pre-built boost. Otherwise, we get
// errors about double-defines. As of this writing, the automatically downloaded
// build of boost doesn't define these, but the pre-built version does. (The old
// prebuilt version was being silently ignored by cmake, so it's unclear when the
// compatibility divergence started)
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#ifndef BOOST_DATE_TIME_NO_LIB
#define BOOST_DATE_TIME_NO_LIB
#endif
#ifndef BOOST_REGEX_NO_LIB
#define BOOST_REGEX_NO_LIB
#endif
#include "boost/asio.hpp"
class ThreadPool final : public IThreadPool, public ReferenceCounted<ThreadPool> {
struct Thread {
ThreadPool* pool;
IThreadPoolReceiver* userObject;
THREAD_HANDLE handle; // Owned by main thread
static thread_local IThreadPoolReceiver* threadUserObject;
explicit Thread(ThreadPool* pool, IThreadPoolReceiver* userObject) : pool(pool), userObject(userObject) {}
~Thread() { ASSERT_ABORT(!userObject); }
void run() {
setThreadPriority(pool->priority());
threadUserObject = userObject;
try {
userObject->init();
while (pool->ios.run_one() && (pool->mode == Mode::Run))
;
} catch (Error& e) {
TraceEvent(SevError, "ThreadPoolError").error(e);
}
delete userObject;
userObject = nullptr;
}
static void dispatch(PThreadAction action) { (*action)(threadUserObject); }
};
THREAD_FUNC start(void* p) {
((Thread*)p)->run();
THREAD_RETURN;
}
std::vector<Thread*> threads;
boost::asio::io_service ios;
boost::asio::io_service::work dontstop;
enum Mode { Run = 0, Shutdown = 2 };
volatile int mode;
int stackSize;
int pri;
struct ActionWrapper {
PThreadAction action;
ActionWrapper(PThreadAction action) : action(action) {}
// HACK: Boost won't use move constructors, so we just assume the last copy made is the one that will be called
// or cancelled
ActionWrapper(ActionWrapper const& r) : action(r.action) { const_cast<ActionWrapper&>(r).action = nullptr; }
void operator()() {
Thread::dispatch(action);
action = nullptr;
}
~ActionWrapper() {
if (action) {
action->cancel();
}
}
ActionWrapper& operator=(ActionWrapper const&) = delete;
};
public:
ThreadPool(int stackSize, int pri) : dontstop(ios), mode(Run), stackSize(stackSize), pri(pri) {}
~ThreadPool() override {}
Future<Void> stop(Error const& e = success()) override {
if (mode == Shutdown)
return Void();
ReferenceCounted<ThreadPool>::addref();
ios.stop(); // doesn't work?
mode = Shutdown;
for (int i = 0; i < threads.size(); i++) {
waitThread(threads[i]->handle);
delete threads[i];
}
ReferenceCounted<ThreadPool>::delref();
return Void();
}
Future<Void> getError() const override { return Never(); } // FIXME
void addref() override { ReferenceCounted<ThreadPool>::addref(); }
void delref() override {
if (ReferenceCounted<ThreadPool>::delref_no_destroy()) {
stop();
delete this;
}
}
void addThread(IThreadPoolReceiver* userData, const char* name) override {
threads.push_back(new Thread(this, userData));
threads.back()->handle = g_network->startThread(start, threads.back(), stackSize, name);
}
void post(PThreadAction action) override { ios.post(ActionWrapper(action)); }
int priority() const { return pri; }
};
Reference<IThreadPool> createGenericThreadPool(int stackSize, int pri) {
return Reference<IThreadPool>(new ThreadPool(stackSize, pri));
}
thread_local IThreadPoolReceiver* ThreadPool::Thread::threadUserObject;