Skip to content

Commit

Permalink
Working implementation of weft thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lightsighter committed Dec 30, 2014
1 parent 146ac8d commit 32f29ec
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ all: $(OUTFILE)

GCC = g++
CC_FLAGS = -ggdb -Wall
LD_FLAGS = -ggdb
LD_FLAGS = -ggdb -lpthread

FILES = weft.cc \
program.cc \
Expand Down
12 changes: 11 additions & 1 deletion src/program.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ void Program::convert_to_instructions(int max_num_threads,
}
}

Thread::Thread(void)
Thread::Thread(unsigned tid, Program *p)
: thread_id(tid), program(p)
{
}

Thread::~Thread(void)
{
}

void Thread::emulate(void)
{
}

11 changes: 10 additions & 1 deletion src/program.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ class Program {

class Thread {
public:
Thread(void);
Thread(unsigned thread_id, Program *p);
Thread(const Thread &rhs) : thread_id(0), program(NULL) { assert(false); }
~Thread(void);
public:
Thread& operator=(const Thread &rhs) { assert(false); return *this; }
public:
void emulate(void);
public:
const unsigned thread_id;
Program *const program;
};

#endif //__PROGRAM_H__
180 changes: 165 additions & 15 deletions src/weft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,36 @@
#include <cstring>
#include <cstdlib>

Weft::Weft(void)
Weft::Weft(int argc, char **argv)
: file_name(NULL), max_num_threads(-1),
thread_pool_size(1), verbose(false)
thread_pool_size(1), verbose(false),
program(NULL), worker_threads(NULL),
pending_count(0)
{
parse_inputs(argc, argv);
start_threadpool();
}

void Weft::verify(int argc, char **argv)
Weft::~Weft(void)
{
parse_inputs(argc, argv);
Program program;
parse_ptx(program);
assert(max_num_threads > 0);
std::vector<Thread> threads(max_num_threads);
emulate_threads(program, threads);
stop_threadpool();
if (program != NULL)
{
delete program;
program = NULL;
}
for (std::vector<Thread*>::iterator it = threads.begin();
it != threads.end(); it++)
{
delete (*it);
}
threads.clear();
}

void Weft::verify(void)
{
parse_ptx();
emulate_threads();
}

void Weft::parse_inputs(int argc, char **argv)
Expand Down Expand Up @@ -81,12 +97,14 @@ void Weft::report_usage(int error, const char *error_str)
exit(error);
}

void Weft::parse_ptx(Program &program)
void Weft::parse_ptx(void)
{
assert(file_name != NULL);
if (verbose)
fprintf(stdout,"WEFT INFO: Parsing file %s...\n", file_name);
max_num_threads = program.parse_ptx_file(file_name, max_num_threads);
assert(program == NULL);
program = new Program();
max_num_threads = program->parse_ptx_file(file_name, max_num_threads);
if (max_num_threads <= 0)
{
fprintf(stderr,"WEFT ERROR %d: Failed to find max number of threads "
Expand All @@ -96,21 +114,153 @@ void Weft::parse_ptx(Program &program)
exit(WEFT_ERROR_NO_THREAD_COUNT);
}
if (verbose)
program.report_statistics();
program->report_statistics();
}

void Weft::emulate_threads(Program &program, std::vector<Thread> &threads)
void Weft::emulate_threads(void)
{
if (verbose)
fprintf(stdout,"WEFT INFO: Emulating %d GPU threads "
"with %d CPU threads...\n",
max_num_threads, thread_pool_size);
assert(max_num_threads > 0);
threads.resize(max_num_threads, NULL);
initialize_count(max_num_threads);
for (int i = 0; i < max_num_threads; i++)
{
threads[i] = new Thread(i, program);
EmulateTask *task = new EmulateTask(threads[i]);
enqueue_task(task);
}
wait_until_done();
}

void Weft::start_threadpool(void)
{
assert(thread_pool_size > 0);
PTHREAD_SAFE_CALL( pthread_mutex_init(&count_lock, NULL) );
PTHREAD_SAFE_CALL( pthread_cond_init(&count_cond, NULL) );
PTHREAD_SAFE_CALL( pthread_mutex_init(&queue_lock, NULL) );
PTHREAD_SAFE_CALL( pthread_cond_init(&queue_cond, NULL) );
assert(worker_threads == NULL);
worker_threads = (pthread_t*)malloc(thread_pool_size * sizeof(pthread_t));
for (int i = 0; i < thread_pool_size; i++)
{
PTHREAD_SAFE_CALL( pthread_create(worker_threads+i, NULL,
Weft::worker_loop, this) );
}
}

void Weft::stop_threadpool(void)
{
// Wake up all the worker threads so that they exit
PTHREAD_SAFE_CALL( pthread_mutex_lock(&queue_lock) );
PTHREAD_SAFE_CALL( pthread_cond_broadcast(&queue_cond) );
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&queue_lock) );
for (int i = 0; i < thread_pool_size; i++)
{
PTHREAD_SAFE_CALL( pthread_join(worker_threads[i], NULL) ) ;
}
free(worker_threads);
worker_threads = NULL;
PTHREAD_SAFE_CALL( pthread_mutex_destroy(&count_lock) );
PTHREAD_SAFE_CALL( pthread_cond_destroy(&count_cond) );
PTHREAD_SAFE_CALL( pthread_mutex_destroy(&queue_lock) );
PTHREAD_SAFE_CALL( pthread_cond_destroy(&queue_cond) );
}

void Weft::initialize_count(unsigned count)
{
PTHREAD_SAFE_CALL( pthread_mutex_lock(&count_lock) );
assert(pending_count == 0);
pending_count = count;
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&count_lock) );
}

void Weft::wait_until_done(void)
{
PTHREAD_SAFE_CALL( pthread_mutex_lock(&count_lock) );
if (pending_count > 0)
{
PTHREAD_SAFE_CALL( pthread_cond_wait(&count_cond, &count_lock) );
}
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&count_lock) );
}

void Weft::enqueue_task(WeftTask *task)
{
PTHREAD_SAFE_CALL( pthread_mutex_lock(&queue_lock) );
queue.push_back(task);
PTHREAD_SAFE_CALL( pthread_cond_signal(&queue_cond) );
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&queue_lock) );
}

WeftTask* Weft::dequeue_task(void)
{
WeftTask *result = NULL;
PTHREAD_SAFE_CALL( pthread_mutex_lock(&queue_lock) );
if (queue.empty())
{
PTHREAD_SAFE_CALL( pthread_cond_wait(&queue_cond, &queue_lock) );
// Check to see if the queue is still empty after waiting
// If it is then we know we are done
if (!queue.empty())
{
result = queue.front();
queue.pop_front();
}
}
else
{
result = queue.front();
queue.pop_front();
}
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&queue_lock) );
return result;
}

void Weft::complete_task(WeftTask *task)
{
PTHREAD_SAFE_CALL( pthread_mutex_lock(&count_lock) );
assert(pending_count > 0);
pending_count--;
if (pending_count == 0)
PTHREAD_SAFE_CALL( pthread_cond_signal(&count_cond) );
PTHREAD_SAFE_CALL( pthread_mutex_unlock(&count_lock) );
// Clean up the task
delete task;
}

/*static*/
void* Weft::worker_loop(void *arg)
{
Weft *weft = (Weft*)arg;
while (true)
{
WeftTask *task = weft->dequeue_task();
// If we ever get a NULL task then we are done
if (task == NULL)
break;
task->execute();
weft->complete_task(task);
}
pthread_exit(NULL);
}

EmulateTask::EmulateTask(Thread *t)
: thread(t)
{
}

void EmulateTask::execute(void)
{
thread->emulate();
}

int main(int argc, char **argv)
{
Weft weft;
weft.verify(argc, argv);
Weft weft(argc, argv);
weft.verify();
return 0;
}

65 changes: 61 additions & 4 deletions src/weft.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@
#ifndef __WEFT_H__
#define __WEFT_H__

#include <cstdio>
#include <cassert>
#include <pthread.h>
#include <deque>
#include <vector>

#define PTHREAD_SAFE_CALL(cmd) \
{ \
int ret = (cmd); \
if (ret != 0) { \
fprintf(stderr,"PTHREAD error: %s = %d (%s)\n", #cmd, ret, strerror(ret)); \
assert(false); \
} \
}

enum {
WEFT_ERROR_NO_FILE_NAME,
WEFT_ERROR_FILE_OPEN,
Expand All @@ -14,21 +27,65 @@ enum {
class Program;
class Thread;

class WeftTask {
public:
virtual ~WeftTask(void) { }
virtual void execute(void) = 0;
};

class EmulateTask : public WeftTask {
public:
EmulateTask(Thread *thread);
EmulateTask(const EmulateTask &rhs) : thread(NULL) { assert(false); }
virtual ~EmulateTask(void) { }
public:
EmulateTask& operator=(const EmulateTask &rhs) { assert(false); return *this; }
public:
virtual void execute(void);
public:
Thread *const thread;
};

class Weft {
public:
Weft(void);
Weft(int argc, char **argv);
~Weft(void);
public:
void verify(int argc, char **argv);
void verify(void);
protected:
void parse_inputs(int argc, char **argv);
void report_usage(int error, const char *error_str);
void parse_ptx(Program &p);
void emulate_threads(Program &p, std::vector<Thread> &threads);
void parse_ptx(void);
void emulate_threads(void);
protected:
void start_threadpool(void);
void stop_threadpool(void);
void initialize_count(unsigned count);
void wait_until_done(void);
void enqueue_task(WeftTask *task);
public:
WeftTask* dequeue_task(void);
void complete_task(WeftTask *task);
public:
static void* worker_loop(void *arg);
protected:
const char *file_name;
int max_num_threads;
int thread_pool_size;
bool verbose;
protected:
Program *program;
std::vector<Thread*> threads;
protected:
pthread_t *worker_threads;
protected:
pthread_mutex_t count_lock;
pthread_cond_t count_cond;
unsigned int pending_count;
protected:
pthread_mutex_t queue_lock;
pthread_cond_t queue_cond;
std::deque<WeftTask*> queue;
};

#endif // __WEFT_H__

0 comments on commit 32f29ec

Please sign in to comment.