forked from zephyrproject-rtos/zephyr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathp4wq.c
309 lines (255 loc) · 7.42 KB
/
p4wq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
* Copyright (c) 2020 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
#include <zephyr/sys/p4wq.h>
#include <zephyr/kernel.h>
#include <zephyr/init.h>
#include <zephyr/sys/iterable_sections.h>
/* private kernel APIs */
#include <ksched.h>
#include <wait_q.h>
LOG_MODULE_REGISTER(p4wq, CONFIG_LOG_DEFAULT_LEVEL);
struct device;
static void set_prio(struct k_thread *th, struct k_p4wq_work *item)
{
__ASSERT_NO_MSG(!IS_ENABLED(CONFIG_SMP) || !z_is_thread_queued(th));
th->base.prio = item->priority;
th->base.prio_deadline = item->deadline;
}
static bool rb_lessthan(struct rbnode *a, struct rbnode *b)
{
struct k_p4wq_work *aw = CONTAINER_OF(a, struct k_p4wq_work, rbnode);
struct k_p4wq_work *bw = CONTAINER_OF(b, struct k_p4wq_work, rbnode);
if (aw->priority != bw->priority) {
return aw->priority > bw->priority;
}
if (aw->deadline != bw->deadline) {
return aw->deadline - bw->deadline > 0;
}
return (uintptr_t)a < (uintptr_t)b;
}
static void thread_set_requeued(struct k_thread *th)
{
th->base.user_options |= K_CALLBACK_STATE;
}
static void thread_clear_requeued(struct k_thread *th)
{
th->base.user_options &= ~K_CALLBACK_STATE;
}
static bool thread_was_requeued(struct k_thread *th)
{
return !!(th->base.user_options & K_CALLBACK_STATE);
}
/* Slightly different semantics: rb_lessthan must be perfectly
* symmetric (to produce a single tree structure) and will use the
* pointer value to break ties where priorities are equal, here we
* tolerate equality as meaning "not lessthan"
*/
static inline bool item_lessthan(struct k_p4wq_work *a, struct k_p4wq_work *b)
{
if (a->priority > b->priority) {
return true;
} else if ((a->priority == b->priority) &&
(a->deadline != b->deadline)) {
return a->deadline - b->deadline > 0;
} else {
;
}
return false;
}
static FUNC_NORETURN void p4wq_loop(void *p0, void *p1, void *p2)
{
ARG_UNUSED(p1);
ARG_UNUSED(p2);
struct k_p4wq *queue = p0;
k_spinlock_key_t k = k_spin_lock(&queue->lock);
while (true) {
struct rbnode *r = rb_get_max(&queue->queue);
if (r) {
struct k_p4wq_work *w
= CONTAINER_OF(r, struct k_p4wq_work, rbnode);
rb_remove(&queue->queue, r);
w->thread = _current;
sys_dlist_append(&queue->active, &w->dlnode);
set_prio(_current, w);
thread_clear_requeued(_current);
k_spin_unlock(&queue->lock, k);
w->handler(w);
k = k_spin_lock(&queue->lock);
/* Remove from the active list only if it
* wasn't resubmitted already
*/
if (!thread_was_requeued(_current)) {
sys_dlist_remove(&w->dlnode);
w->thread = NULL;
k_sem_give(&w->done_sem);
}
} else {
z_pend_curr(&queue->lock, k, &queue->waitq, K_FOREVER);
k = k_spin_lock(&queue->lock);
}
}
}
/* Must be called to regain ownership of the work item */
int k_p4wq_wait(struct k_p4wq_work *work, k_timeout_t timeout)
{
if (work->sync) {
return k_sem_take(&work->done_sem, timeout);
}
return k_sem_count_get(&work->done_sem) ? 0 : -EBUSY;
}
void k_p4wq_init(struct k_p4wq *queue)
{
memset(queue, 0, sizeof(*queue));
z_waitq_init(&queue->waitq);
queue->queue.lessthan_fn = rb_lessthan;
sys_dlist_init(&queue->active);
}
void k_p4wq_add_thread(struct k_p4wq *queue, struct k_thread *thread,
k_thread_stack_t *stack,
size_t stack_size)
{
k_thread_create(thread, stack, stack_size,
p4wq_loop, queue, NULL, NULL,
K_HIGHEST_THREAD_PRIO, 0,
queue->flags & K_P4WQ_DELAYED_START ? K_FOREVER : K_NO_WAIT);
}
static int static_init(void)
{
STRUCT_SECTION_FOREACH(k_p4wq_initparam, pp) {
for (int i = 0; i < pp->num; i++) {
uintptr_t ssz = K_THREAD_STACK_LEN(pp->stack_size);
struct k_p4wq *q = pp->flags & K_P4WQ_QUEUE_PER_THREAD ?
pp->queue + i : pp->queue;
if (!i || (pp->flags & K_P4WQ_QUEUE_PER_THREAD)) {
k_p4wq_init(q);
}
q->flags = pp->flags;
/*
* If the user wants to specify CPU affinity, we have to
* delay starting threads until that has been done
*/
if (q->flags & K_P4WQ_USER_CPU_MASK) {
q->flags |= K_P4WQ_DELAYED_START;
}
k_p4wq_add_thread(q, &pp->threads[i],
&pp->stacks[ssz * i],
pp->stack_size);
if (pp->flags & K_P4WQ_DELAYED_START) {
z_mark_thread_as_suspended(&pp->threads[i]);
}
#ifdef CONFIG_SCHED_CPU_MASK
if (pp->flags & K_P4WQ_USER_CPU_MASK) {
int ret = k_thread_cpu_mask_clear(&pp->threads[i]);
if (ret < 0) {
LOG_ERR("Couldn't clear CPU mask: %d", ret);
}
}
#endif
}
}
return 0;
}
void k_p4wq_enable_static_thread(struct k_p4wq *queue, struct k_thread *thread,
uint32_t cpu_mask)
{
#ifdef CONFIG_SCHED_CPU_MASK
if (queue->flags & K_P4WQ_USER_CPU_MASK) {
unsigned int i;
while ((i = find_lsb_set(cpu_mask))) {
int ret = k_thread_cpu_mask_enable(thread, i - 1);
if (ret < 0) {
LOG_ERR("Couldn't set CPU mask for %u: %d", i, ret);
}
cpu_mask &= ~BIT(i - 1);
}
}
#endif
if (queue->flags & K_P4WQ_DELAYED_START) {
z_mark_thread_as_not_suspended(thread);
k_thread_start(thread);
}
}
/* We spawn a bunch of high priority threads, use the "SMP" initlevel
* so they can initialize in parallel instead of serially on the main
* CPU.
*/
SYS_INIT(static_init, APPLICATION, 99);
void k_p4wq_submit(struct k_p4wq *queue, struct k_p4wq_work *item)
{
k_spinlock_key_t k = k_spin_lock(&queue->lock);
/* Input is a delta time from now (to match
* k_thread_deadline_set()), but we store and use the absolute
* cycle count.
*/
item->deadline += k_cycle_get_32();
/* Resubmission from within handler? Remove from active list */
if (item->thread == _current) {
sys_dlist_remove(&item->dlnode);
thread_set_requeued(_current);
item->thread = NULL;
} else {
k_sem_init(&item->done_sem, 0, 1);
}
__ASSERT_NO_MSG(item->thread == NULL);
rb_insert(&queue->queue, &item->rbnode);
item->queue = queue;
/* If there were other items already ahead of it in the queue,
* then we don't need to revisit active thread state and can
* return.
*/
if (rb_get_max(&queue->queue) != &item->rbnode) {
goto out;
}
/* Check the list of active (running or preempted) items, if
* there are at least an "active target" of those that are
* higher priority than the new item, then no one needs to be
* preempted and we can return.
*/
struct k_p4wq_work *wi;
uint32_t n_beaten_by = 0, active_target = arch_num_cpus();
SYS_DLIST_FOR_EACH_CONTAINER(&queue->active, wi, dlnode) {
/*
* item_lessthan(a, b) == true means a has lower priority than b
* !item_lessthan(a, b) counts all work items with higher or
* equal priority
*/
if (!item_lessthan(wi, item)) {
n_beaten_by++;
}
}
if (n_beaten_by >= active_target) {
/* Too many already have higher priority, not preempting */
goto out;
}
/* Grab a thread, set its priority and queue it. If there are
* no threads available to unpend, this is a soft runtime
* error: we are breaking our promise about run order.
* Complain.
*/
struct k_thread *th = z_unpend_first_thread(&queue->waitq);
if (th == NULL) {
LOG_WRN("Out of worker threads, priority guarantee violated");
goto out;
}
set_prio(th, item);
z_ready_thread(th);
z_reschedule(&queue->lock, k);
return;
out:
k_spin_unlock(&queue->lock, k);
}
bool k_p4wq_cancel(struct k_p4wq *queue, struct k_p4wq_work *item)
{
k_spinlock_key_t k = k_spin_lock(&queue->lock);
bool ret = rb_contains(&queue->queue, &item->rbnode);
if (ret) {
rb_remove(&queue->queue, &item->rbnode);
k_sem_give(&item->done_sem);
}
k_spin_unlock(&queue->lock, k);
return ret;
}