Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Pithikos committed Jan 18, 2017
2 parents 2ba0bd5 + da2c0fe commit 154e583
Showing 1 changed file with 63 additions and 72 deletions.
135 changes: 63 additions & 72 deletions thpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ typedef struct thpool_{
volatile int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
pthread_cond_t threads_all_idle; /* signal to thpool_wait */
jobqueue* jobqueue_p; /* pointer to the job queue */
jobqueue jobqueue; /* job queue */
} thpool_;


Expand All @@ -90,11 +90,11 @@ static void* thread_do(struct thread* thread_p);
static void thread_hold();
static void thread_destroy(struct thread* thread_p);

static int jobqueue_init(thpool_* thpool_p);
static void jobqueue_clear(thpool_* thpool_p);
static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p);
static struct job* jobqueue_pull(thpool_* thpool_p);
static void jobqueue_destroy(thpool_* thpool_p);
static int jobqueue_init(jobqueue* jobqueue_p);
static void jobqueue_clear(jobqueue* jobqueue_p);
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
static struct job* jobqueue_pull(jobqueue* jobqueue_p);
static void jobqueue_destroy(jobqueue* jobqueue_p);

static void bsem_init(struct bsem *bsem_p, int value);
static void bsem_reset(struct bsem *bsem_p);
Expand Down Expand Up @@ -130,7 +130,7 @@ struct thpool_* thpool_init(int num_threads){
thpool_p->num_threads_working = 0;

/* Initialise the job queue */
if (jobqueue_init(thpool_p) == -1){
if (jobqueue_init(&thpool_p->jobqueue) == -1){
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
free(thpool_p);
return NULL;
Expand All @@ -140,8 +140,7 @@ struct thpool_* thpool_init(int num_threads){
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
if (thpool_p->threads == NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
jobqueue_destroy(thpool_p);
free(thpool_p->jobqueue_p);
jobqueue_destroy(&thpool_p->jobqueue);
free(thpool_p);
return NULL;
}
Expand Down Expand Up @@ -179,9 +178,7 @@ int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
newjob->arg=arg_p;

/* add job to queue */
pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
jobqueue_push(thpool_p, newjob);
pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
jobqueue_push(&thpool_p->jobqueue, newjob);

return 0;
}
Expand All @@ -190,7 +187,7 @@ int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
pthread_mutex_lock(&thpool_p->thcount_lock);
while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working) {
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
Expand All @@ -213,21 +210,19 @@ void thpool_destroy(thpool_* thpool_p){
double tpassed = 0.0;
time (&start);
while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue_p->has_jobs);
bsem_post_all(thpool_p->jobqueue.has_jobs);
time (&end);
tpassed = difftime(end,start);
}

/* Poll remaining threads */
while (thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue_p->has_jobs);
bsem_post_all(thpool_p->jobqueue.has_jobs);
sleep(1);
}

/* Job queue cleanup */
jobqueue_destroy(thpool_p);
free(thpool_p->jobqueue_p);

jobqueue_destroy(&thpool_p->jobqueue);
/* Deallocs */
int n;
for (n=0; n < threads_total; n++){
Expand Down Expand Up @@ -338,7 +333,7 @@ static void* thread_do(struct thread* thread_p){

while(threads_keepalive){

bsem_wait(thpool_p->jobqueue_p->has_jobs);
bsem_wait(thpool_p->jobqueue.has_jobs);

if (threads_keepalive){

Expand All @@ -347,12 +342,9 @@ static void* thread_do(struct thread* thread_p){
pthread_mutex_unlock(&thpool_p->thcount_lock);

/* Read job from queue and execute it */
void (*func_buff)(void* arg);
void* arg_buff;
job* job_p;
pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
job_p = jobqueue_pull(thpool_p);
pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
void*(*func_buff)(void* arg);
void* arg_buff;
job* job_p = jobqueue_pull(&thpool_p->jobqueue);
if (job_p) {
func_buff = job_p->function;
arg_buff = job_p->arg;
Expand Down Expand Up @@ -390,105 +382,104 @@ static void thread_destroy (thread* thread_p){


/* Initialize queue */
static int jobqueue_init(thpool_* thpool_p){
static int jobqueue_init(jobqueue* jobqueue_p){
jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;

thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue));
if (thpool_p->jobqueue_p == NULL){
jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (jobqueue_p->has_jobs == NULL){
return -1;
}
thpool_p->jobqueue_p->len = 0;
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;

thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (thpool_p->jobqueue_p->has_jobs == NULL){
return -1;
}

pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
bsem_init(jobqueue_p->has_jobs, 0);

return 0;
}


/* Clear the queue */
static void jobqueue_clear(thpool_* thpool_p){
static void jobqueue_clear(jobqueue* jobqueue_p){

while(thpool_p->jobqueue_p->len){
free(jobqueue_pull(thpool_p));
while(jobqueue_p->len){
free(jobqueue_pull(jobqueue_p));
}

thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
bsem_reset(thpool_p->jobqueue_p->has_jobs);
thpool_p->jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
bsem_reset(jobqueue_p->has_jobs);
jobqueue_p->len = 0;

}


/* Add (allocated) job to queue
*
* Notice: Caller MUST hold a mutex
*/
static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){

pthread_mutex_lock(&jobqueue_p->rwmutex);
newjob->prev = NULL;

switch(thpool_p->jobqueue_p->len){
switch(jobqueue_p->len){

case 0: /* if no jobs in queue */
thpool_p->jobqueue_p->front = newjob;
thpool_p->jobqueue_p->rear = newjob;
jobqueue_p->front = newjob;
jobqueue_p->rear = newjob;
break;

default: /* if jobs in queue */
thpool_p->jobqueue_p->rear->prev = newjob;
thpool_p->jobqueue_p->rear = newjob;

jobqueue_p->rear->prev = newjob;
jobqueue_p->rear = newjob;
}
thpool_p->jobqueue_p->len++;

bsem_post(thpool_p->jobqueue_p->has_jobs);
jobqueue_p->len++;

bsem_post(jobqueue_p->has_jobs);
pthread_mutex_unlock(&jobqueue_p->rwmutex);
}


/* Get first job from queue(removes it from queue)
<<<<<<< HEAD
*
* Notice: Caller MUST hold a mutex
=======
>>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490
*/
static struct job* jobqueue_pull(thpool_* thpool_p){
static struct job* jobqueue_pull(jobqueue* jobqueue_p){

job* job_p;
job_p = thpool_p->jobqueue_p->front;

switch(thpool_p->jobqueue_p->len){
pthread_mutex_lock(&jobqueue_p->rwmutex);
job* job_p = jobqueue_p->front;

switch(jobqueue_p->len){

case 0: /* if no jobs in queue */
break;

case 1: /* if one job in queue */
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
thpool_p->jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->len = 0;
break;

default: /* if >1 jobs in queue */
thpool_p->jobqueue_p->front = job_p->prev;
thpool_p->jobqueue_p->len--;
jobqueue_p->front = job_p->prev;
jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(thpool_p->jobqueue_p->has_jobs);

bsem_post(jobqueue_p->has_jobs);
}

pthread_mutex_unlock(&jobqueue_p->rwmutex);
return job_p;
}


/* Free all queue resources back to the system */
static void jobqueue_destroy(thpool_* thpool_p){
jobqueue_clear(thpool_p);
free(thpool_p->jobqueue_p->has_jobs);
static void jobqueue_destroy(jobqueue* jobqueue_p){
jobqueue_clear(jobqueue_p);
free(jobqueue_p->has_jobs);
}


Expand Down

0 comments on commit 154e583

Please sign in to comment.