forked from hrydgard/ppsspp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTestThreadManager.cpp
155 lines (124 loc) · 3.76 KB
/
TestThreadManager.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <thread>
#include <vector>
#include "Common/Log.h"
#include "Common/TimeUtil.h"
#include "Common/Thread/Barrier.h"
#include "Common/Thread/ThreadManager.h"
#include "Common/Thread/Channel.h"
#include "Common/Thread/Promise.h"
#include "Common/Thread/ParallelLoop.h"
#include "Common/Thread/ThreadUtil.h"
#include "Common/Thread/Waitable.h"
#include "UnitTest.h"
struct ResultObject {
bool ok;
};
ResultObject *ResultProducer() {
sleep_ms(250);
printf("result produced: thread %d\n", GetCurrentThreadIdForDebug());
return new ResultObject{ true };
}
bool TestMailbox() {
Mailbox<ResultObject *> *mailbox = new Mailbox<ResultObject *>();
mailbox->Send(new ResultObject{ true });
ResultObject *data;
data = mailbox->Wait();
_assert_(data && data->ok);
delete data;
mailbox->Release();
return true;
}
void rangeFunc(int lower, int upper) {
sleep_ms(30);
printf(" - range %d-%d (thread %d)\n", lower, upper, GetCurrentThreadIdForDebug());
}
// This always passes unless something is badly broken, the interesting thing is the
// logged output.
bool TestParallelLoop(ThreadManager *threadMan) {
printf("tester thread ID: %d\n", GetCurrentThreadIdForDebug());
printf("waitable test\n");
WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1, TaskPriority::HIGH);
// Can do stuff here if we like.
waitable->WaitAndRelease();
// Now it's done.
// Try a loop with stragglers.
printf("blocking test #1 [0-65)\n");
ParallelRangeLoop(threadMan, rangeFunc, 0, 65, 1);
// Try a loop with a relatively large minimum size.
printf("blocking test #2 [0-100)\n");
ParallelRangeLoop(threadMan, rangeFunc, 0, 100, 40);
// Try a loop with minimum size larger than range.
printf("waitable test [10-30)\n");
WaitableCounter *waitable2 = ParallelRangeLoopWaitable(threadMan, rangeFunc, 10, 30, 40, TaskPriority::LOW);
waitable2->WaitAndRelease();
return true;
}
const size_t THREAD_COUNT = 9;
const size_t ITERATIONS = 40000;
static std::atomic<int> g_atomicCounter;
static ThreadManager *g_threadMan;
static CountingBarrier g_barrier(THREAD_COUNT + 1);
class IncrementTask : public Task {
public:
IncrementTask(TaskType type, LimitedWaitable *waitable) : type_(type), waitable_(waitable) {}
~IncrementTask() {}
TaskType Type() const override { return type_; }
TaskPriority Priority() const override {
return TaskPriority::NORMAL;
}
void Run() override {
g_atomicCounter++;
waitable_->Notify();
}
private:
TaskType type_;
LimitedWaitable *waitable_;
};
void ThreadFunc() {
for (int i = 0; i < ITERATIONS; i++) {
auto threadWaitable = new LimitedWaitable();
g_threadMan->EnqueueTask(new IncrementTask((i & 1) ? TaskType::CPU_COMPUTE : TaskType::IO_BLOCKING, threadWaitable));
threadWaitable->WaitAndRelease();
}
g_barrier.Arrive();
}
bool TestMultithreadedScheduling() {
g_atomicCounter = 0;
auto start = Instant::Now();
std::vector<std::thread> threads;
for (int i = 0; i < THREAD_COUNT; i++) {
threads.push_back(std::thread(ThreadFunc));
}
// Just testing the barrier
g_barrier.Arrive();
// OK, all are done.
EXPECT_EQ_INT(g_atomicCounter, THREAD_COUNT * ITERATIONS);
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i].join();
}
threads.clear();
printf("Stress test elapsed: %0.2f", start.Elapsed());
return true;
}
bool TestThreadManager() {
ThreadManager manager;
manager.Init(8, 1);
g_threadMan = &manager;
Promise<ResultObject *> *object(Promise<ResultObject *>::Spawn(&manager, &ResultProducer, TaskType::IO_BLOCKING));
if (!TestParallelLoop(&manager)) {
return false;
}
sleep_ms(100);
ResultObject *result = object->BlockUntilReady();
if (result) {
printf("Got result back!\n");
}
delete object;
if (!TestMailbox()) {
return false;
}
if (!TestMultithreadedScheduling()) {
return false;
}
return true;
}