-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththreadpool.h
113 lines (103 loc) · 2.85 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifndef _THREADPOOL_
#define _THREADPOOL_
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
typedef struct task
{
void *(*func)(void *);
void *arg;
struct task *next;
} thread_task;
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
thread_task *queue_head;
pthread_t *tids;
int shutdown;
int max_thread_num;
int cur_queue_size;
}thread_pool;
int pool_init(thread_pool *pool, int max_thread_num);
int pool_add_task(thread_pool *pool, void *(*func)(void *), void *arg);
int pool_destroy(thread_pool *pool);
void *thread_main(void *arg);
int pool_init(thread_pool *pool, int max_thread_num)
{
pthread_mutex_init(&(pool->queue_lock), NULL);
pthread_cond_init(&(pool->queue_ready), NULL);
pool->queue_head = NULL;
pool->shutdown = 0;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->tids = (pthread_t*)malloc(sizeof(pthread_t)*max_thread_num);
for(int i=0; i<max_thread_num; ++i)
pthread_create(&(pool->tids[i]), NULL, &thread_main, (void*) pool);
return 0;
}
void *thread_main(void *void_pool)
{
thread_pool *pool = (thread_pool*)void_pool;
for(;;)
{
pthread_mutex_lock(&(pool->queue_lock));
while(pool->cur_queue_size == 0 && !pool->shutdown)
{
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
}
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL);
}
pool->cur_queue_size--;
thread_task *task = pool->queue_head;
pool->queue_head = task->next;
pthread_mutex_unlock(&(pool->queue_lock));
(*(task->func))(task->arg);
free(task);
}
pthread_exit(NULL);
}
int pool_add_task(thread_pool *pool, void *(*func)(void *), void *arg)
{
thread_task* task = (thread_task*)malloc(sizeof(thread_task));
task->func = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&(pool->queue_lock));
thread_task *cur = pool->queue_head;
if(cur != NULL)
{
while(cur->next != NULL)
cur=cur->next;
cur->next = task;
}
else
pool->queue_head = task;
++pool->cur_queue_size;
pthread_mutex_unlock(&(pool->queue_lock));
pthread_cond_signal(&(pool->queue_ready));
return 0;
}
int pool_destroy(thread_pool *pool)
{
if(pool->shutdown) return -1;
pool->shutdown = 1;
pthread_cond_broadcast(&(pool->queue_ready));
for(int i=0; i<pool->max_thread_num; ++i)
pthread_join(pool->tids[i], NULL);
free(pool->tids);
thread_task *cur = NULL;
while(pool->queue_head != NULL)
{
cur = pool->queue_head;
pool->queue_head = cur->next;
free(cur);
}
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
return 0;
}
#endif