-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproduconsum.c
209 lines (179 loc) · 4.64 KB
/
produconsum.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#include <assert.h>
#include "threads.h"
#include <errno.h>
#include "log.h"
#include "produconsum.h"
#include "util.h"
#define DEBUG 0
/**
* Simple implementation of producer-consumer pattern
*/
struct produconsum {
unsigned int size;
volatile unsigned int produced;
unsigned int consumed;
volatile int atEnd;
pthread_mutex_t mutex;
volatile int consumerIsWaiting;
pthread_cond_t cond;
const char *name;
};
produconsum_t pc_makeProduconsum(unsigned int size, const char *name)
{
produconsum_t pc = MALLOC(struct produconsum);
pc->size = size;
pc->produced = 0;
pc->consumed = 0;
pc->atEnd = 0;
pthread_mutex_init(&pc->mutex, NULL);
pc->consumerIsWaiting = 0;
pthread_cond_init(&pc->cond, NULL);
pc->name = name;
return pc;
}
static void wakeConsumer(produconsum_t pc)
{
if(pc->consumerIsWaiting) {
pthread_mutex_lock(&pc->mutex);
pthread_cond_signal(&pc->cond);
pthread_mutex_unlock(&pc->mutex);
}
}
/**
* We assume here that the producer never ever produces more than fits into
* the buffer. To ensure this, use a second buffer, oriented in the other
* direction
*/
void pc_produce(produconsum_t pc, unsigned int amount)
{
unsigned int produced = pc->produced;
unsigned int consumed = pc->consumed;
/* sanity checks:
* 1. should not produce more than size
* 2. do not pass consumed+size
*/
if(amount > pc->size) {
udpc_fatal(1, "Buffer overflow in produce %s: %d > %d \n",
pc->name, amount, pc->size);
}
produced += amount;
if(produced >= 2*pc->size)
produced -= 2*pc->size;
if(produced > consumed + pc->size ||
(produced < consumed && produced > consumed - pc->size)) {
udpc_fatal(1, "Buffer overflow in produce %s: %d > %d [%d] \n",
pc->name, produced, consumed, pc->size);
}
pc->produced = produced;
wakeConsumer(pc);
}
void pc_produceEnd(produconsum_t pc)
{
pc->atEnd = 1;
wakeConsumer(pc);
}
static unsigned int getProducedAmount(produconsum_t pc) {
unsigned int produced = pc->produced;
unsigned int consumed = pc->consumed;
if(produced < consumed)
return produced + 2 * pc->size - consumed;
else
return produced - consumed;
}
unsigned int pc_getWaiting(produconsum_t pc)
{
return getProducedAmount(pc);
}
static unsigned int _consumeAny(produconsum_t pc, unsigned int minAmount,
struct timespec *ts) {
unsigned int amount;
#if DEBUG
flprintf("%s: Waiting for %d bytes (%d:%d)\n",
pc->name, minAmount, pc->consumed, pc->produced);
#endif
pc->consumerIsWaiting=1;
amount = getProducedAmount(pc);
if(amount >= minAmount || pc->atEnd) {
pc->consumerIsWaiting=0;
#if DEBUG
flprintf("%s: got %d bytes\n",pc->name, amount);
#endif
return amount;
}
pthread_mutex_lock(&pc->mutex);
while((amount=getProducedAmount(pc)) < minAmount && !pc->atEnd) {
#if DEBUG
flprintf("%s: ..Waiting for %d bytes (%d:%d)\n",
pc->name, minAmount, pc->consumed, pc->produced);
#endif
if(ts == 0)
pthread_cond_wait(&pc->cond, &pc->mutex);
else {
int r;
#if DEBUG
flprintf("Before timed wait\n");
#endif
r=pthread_cond_timedwait(&pc->cond, &pc->mutex, ts);
#if DEBUG
flprintf("After timed wait %d\n", r);
#endif
if(r == ETIMEDOUT) {
amount=getProducedAmount(pc);
break;
}
}
}
pthread_mutex_unlock(&pc->mutex);
#if DEBUG
flprintf("%s: Got them %d (for %d) %d\n", pc->name,
amount, minAmount, pc->atEnd);
#endif
pc->consumerIsWaiting=0;
return amount;
}
unsigned int pc_consumed(produconsum_t pc, unsigned int amount)
{
unsigned int consumed = pc->consumed;
if(consumed >= 2*pc->size - amount) {
consumed += amount - 2 *pc->size;
} else {
consumed += amount;
}
pc->consumed = consumed;
return amount;
}
unsigned int pc_consumeAny(produconsum_t pc)
{
return _consumeAny(pc, 1, 0);
}
unsigned int pc_consumeAnyWithTimeout(produconsum_t pc, struct timespec *ts)
{
return _consumeAny(pc, 1, ts);
}
unsigned int pc_consumeAnyContiguous(produconsum_t pc)
{
return pc_consumeContiguousMinAmount(pc, 1);
}
unsigned int pc_consumeContiguousMinAmount(produconsum_t pc, unsigned int amount)
{
unsigned int n = _consumeAny(pc, amount, 0);
unsigned int l = pc->size - (pc->consumed % pc->size);
if(n > l)
n = l;
return n;
}
unsigned int pc_consume(produconsum_t pc, unsigned int amount)
{
return _consumeAny(pc, amount, 0);
}
unsigned int pc_getConsumerPosition(produconsum_t pc)
{
return pc->consumed % pc->size;
}
unsigned int pc_getProducerPosition(produconsum_t pc)
{
return pc->produced % pc->size;
}
unsigned int pc_getSize(produconsum_t pc) {
return pc->size;
}