Skip to content

Commit

Permalink
speech-synthesis: update thr-pool (#842)
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas authored and yorkie committed Jul 21, 2019
1 parent 3108c59 commit 3f9b4df
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 67 deletions.
2 changes: 1 addition & 1 deletion packages/@yodaos/speech-synthesis/src/pcm-player.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,6 @@ void PcmPlayer::cancel() {
RKLogw("flush data error(%d): %s", err, pa_strerror(err));
}
/** clears pending writes */
tp.finish();
tp.clear();
this->end();
}
200 changes: 134 additions & 66 deletions packages/@yodaos/speech-synthesis/src/thr-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,67 @@
#include <chrono>
#include <list>
#include <vector>
#include <algorithm>

#define THRPOOL_INITIALIZED 1
#define THRPOOL_MAX_THREADS 1024
#define TASKTHREAD_FLAG_CREATED 1
#define TASKTHREAD_FLAG_SHOULD_EXIT 2
#define TASK_OP_QUEUE 0
#define TASK_OP_DOING 1
#define TASK_OP_DONE 2
#define TASK_OP_DISCARD 3

class ThreadPool {
private:
typedef std::function<void()> TaskFunc;
/// \param op 0: queue 1: doing 2: done 3: discard
typedef std::function<void(int32_t op)> TaskCallback;

class TaskInfo {
public:
TaskInfo() {
}

TaskInfo(TaskFunc f, TaskCallback c) : func(f), cb(c) {
}

TaskFunc func;
TaskCallback cb;
};

class TaskThread {
public:
TaskThread() {
}

TaskThread(const TaskThread&& o) {
TaskThread(const TaskThread& o) {
}

void set_thr_pool(ThreadPool* pool) {
the_pool = pool;
void setThreadPool(ThreadPool* pool) {
thePool = pool;
}

void do_task(TaskFunc& func) {
std::unique_lock<std::mutex> locker(thr_mutex);
void awake() {
std::unique_lock<std::mutex> locker(thrMutex);
if ((flags & TASKTHREAD_FLAG_CREATED) == 0 &&
(flags & TASKTHREAD_FLAG_SHOULD_EXIT) == 0) {
// init
thr = std::thread([this]() { this->run(); });
thr = std::thread([this]() { run(); });
flags |= TASKTHREAD_FLAG_CREATED;
thr_cond.wait(locker);
}
// launch task
task_func = func;
thr_cond.notify_one();
}

void exit() {
std::unique_lock<std::mutex> locker(thr_mutex);
void work() {
std::lock_guard<std::mutex> locker(thrMutex);
thrCond.notify_one();
}

void sleep() {
std::unique_lock<std::mutex> locker(thrMutex);
flags |= TASKTHREAD_FLAG_SHOULD_EXIT;
if (thr.joinable()) {
thr_cond.notify_one();
thrCond.notify_one();
locker.unlock();
thr.join();
locker.lock();
Expand All @@ -55,95 +77,141 @@ class ThreadPool {

private:
void run() {
std::unique_lock<std::mutex> locker(thr_mutex);
thr_cond.notify_one();
std::unique_lock<std::mutex> locker(thrMutex);
TaskInfo task;

while ((flags & TASKTHREAD_FLAG_SHOULD_EXIT) == 0) {
if (task_func) {
task_func();
task_func = the_pool->get_pending_task();
if (task_func)
continue;
else
the_pool->push_idle_thread(this);
if (thePool->getPendingTask(task)) {
if (task.cb)
task.cb(TASK_OP_DOING);
if (task.func)
task.func();
if (task.cb)
task.cb(TASK_OP_DONE);
} else {
thePool->pushIdleThread(this);
thrCond.wait(locker);
}
thr_cond.wait(locker);
}
}

private:
ThreadPool* the_pool = nullptr;
ThreadPool* thePool{ nullptr };
std::thread thr;
std::mutex thr_mutex;
std::condition_variable thr_cond;
TaskFunc task_func;
uint32_t flags = 0;
std::mutex thrMutex;
std::condition_variable thrCond;
uint32_t flags{ 0 };
};

public:
explicit ThreadPool(uint32_t max) {
thread_array.resize(max);
ThreadPool() {
}

uint32_t i;
for (i = 0; i < max; ++i) {
thread_array[i].set_thr_pool(this);
}
init_idle_threads();
explicit ThreadPool(uint32_t max) {
init(max);
}

~ThreadPool() {
finish();
}

void push(TaskFunc task) {
std::lock_guard<std::mutex> locker(task_mutex);
if (idle_threads.empty()) {
pending_tasks.push_back(task);
} else {
idle_threads.front()->do_task(task);
idle_threads.pop_front();
void init(uint32_t max) {
std::lock_guard<std::mutex> locker(poolMutex);
if (max == 0 || max > THRPOOL_MAX_THREADS || status & THRPOOL_INITIALIZED)
return;
status |= THRPOOL_INITIALIZED;
threadArray.resize(max);

uint32_t i;
for (i = 0; i < max; ++i) {
threadArray[i].setThreadPool(this);
}
initSleepThreads();
}

void push(TaskFunc task, TaskCallback cb = nullptr) {
std::lock_guard<std::mutex> locker(poolMutex);
taskMutex.lock();
pendingTasks.push_back({ task, cb });
taskMutex.unlock();
if (cb)
cb(TASK_OP_QUEUE);
if (!idleThreads.empty()) {
auto thr = idleThreads.front();
idleThreads.pop_front();
thr->work();
} else if (!sleepThreads.empty()) {
auto thr = sleepThreads.front();
sleepThreads.pop_front();
thr->awake();
}
}

void finish() {
size_t sz = thread_array.size();
size_t i;
for (i = 0; i < sz; ++i) {
thread_array[i].exit();
std::lock_guard<std::mutex> locker(poolMutex);
std::unique_lock<std::mutex> taskLocker(taskMutex);
while (!pendingTasks.empty()) {
tasksDone.wait(taskLocker);
}
std::lock_guard<std::mutex> locker(task_mutex);
pending_tasks.clear();
taskLocker.unlock();
clearNolock();
}

void clear() {
std::lock_guard<std::mutex> locker(poolMutex);
clearNolock();
}

private:
void init_idle_threads() {
std::lock_guard<std::mutex> locker(task_mutex);
size_t sz = thread_array.size();
void initSleepThreads() {
size_t sz = threadArray.size();
size_t i;
for (i = 0; i < sz; ++i) {
idle_threads.push_back(thread_array.data() + i);
sleepThreads.push_back(threadArray.data() + i);
}
}

bool getPendingTask(TaskInfo& task) {
std::lock_guard<std::mutex> locker(taskMutex);
if (pendingTasks.empty()) {
task.func = nullptr;
task.cb = nullptr;
tasksDone.notify_one();
return false;
}
task = pendingTasks.front();
pendingTasks.pop_front();
return true;
}

TaskFunc get_pending_task() {
std::lock_guard<std::mutex> locker(task_mutex);
if (pending_tasks.empty())
return nullptr;
TaskFunc func = pending_tasks.front();
pending_tasks.pop_front();
return func;
void pushIdleThread(TaskThread* thr) {
std::lock_guard<std::mutex> locker(taskMutex);
idleThreads.push_back(thr);
}

void push_idle_thread(TaskThread* thr) {
std::lock_guard<std::mutex> locker(task_mutex);
idle_threads.push_back(thr);
void clearNolock() {
size_t sz = threadArray.size();
size_t i;
for (i = 0; i < sz; ++i) {
threadArray[i].sleep();
}
for_each(pendingTasks.begin(), pendingTasks.end(), [](TaskInfo& task) {
if (task.cb)
task.cb(TASK_OP_DISCARD);
});
pendingTasks.clear();
initSleepThreads();
}

private:
std::list<TaskFunc> pending_tasks;
std::list<TaskThread*> idle_threads;
std::mutex task_mutex;
std::vector<TaskThread> thread_array;
std::list<TaskInfo> pendingTasks;
std::list<TaskThread*> idleThreads;
std::list<TaskThread*> sleepThreads;
std::mutex poolMutex;
std::mutex taskMutex;
std::condition_variable tasksDone;
std::vector<TaskThread> threadArray;
uint32_t status{ 0 };

friend TaskThread;
};

0 comments on commit 3f9b4df

Please sign in to comment.