-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathcancellable_cpu_executor.h
109 lines (93 loc) · 3.41 KB
/
cancellable_cpu_executor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/
#pragma once
#include "globaltask.h"
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
/**
* Custom folly CPU thread pool executor which puts work in a separate queue
* that we control so that we can remove tasks as and when a bucket goes away
* without having to actually run them (seen to noticeably slow down bucket
* deinitialization which can case rebalance failures).
*/
class CancellableCPUExecutor {
public:
CancellableCPUExecutor(size_t numThreads,
std::shared_ptr<folly::ThreadFactory> threadFactory)
: cpuPool(numThreads, std::move(threadFactory)) {
}
size_t numThreads() const {
return cpuPool.numThreads();
}
void setNumThreads(size_t numThreads) {
cpuPool.setNumThreads(numThreads);
}
folly::ThreadPoolExecutor::PoolStats getPoolStats() const {
return cpuPool.getPoolStats();
}
size_t getPendingTaskCount() const {
return cpuPool.getPendingTaskCount();
}
size_t getTaskQueueSize() const {
return cpuPool.getTaskQueueSize();
}
void join() {
cpuPool.join();
}
/**
* Add a task to run
*
* @param task which includes the taskable and taskId that we need in this
* class. Task can be null if the work is not strictly associated
* with a GlobalTask (i.e. a reset of a task pointer).
* @param func the function to run (which runs the task)
*/
void add(GlobalTask* task, folly::Func func);
/**
* Remove tasks for the given taskable from our work queue. Should always
* be run on the SchedulerPool (EventBase) to ensure that tasks are not
* scheduled on this pool while we run.
*
* @param taskable to remove tasks for
* @return vector of GlobalTask* for this taskable
*/
std::vector<GlobalTask*> removeTasksForTaskable(const Taskable& taskable);
private:
/**
* Element of our queue
*/
struct QueueElem {
QueueElem(GlobalTask* task, folly::Func func)
: task(task), func(std::move(func)) {
}
bool isInternalExecutorTask() const {
return !task;
}
// Taking a GlobalTask* here because it contains both the Taskable
// reference and the taskId which we need to cancel tasks. We take a
// GlobalTask* over an ExTask to avoid shared_ptr promotions for the
// sake of efficiency. We require this to be nullable as some tasks
// (the reset of task ptrs on the cpuPool) are not associated with a
// GlobalTask.
GlobalTask* task;
folly::Func func;
};
/**
* Queue of tasks to run. Uses the same queue type as
* folly::CPUThreadPoolExecutor. The task here contians the actual work we
* want to run.
*/
folly::UMPMCQueue<QueueElem, false, 6> tasks;
/**
* Underlying CPUThreadPoolExecutor which runs "notification" tasks to run a
* task in our tasks queue.
*/
folly::CPUThreadPoolExecutor cpuPool;
};