-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathasymmetric-intqueue.js
388 lines (341 loc) · 12.3 KB
/
asymmetric-intqueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* Asymmetric bounded unidirectional integer-bundle queues.
*
* All data that can be communicated through shared memory can be
* marshaled as integer data, so these queues are general building
* blocks for inter-thread data communication through shared memory.
*
* There are two data structures:
*
* MasterProducerIntQueue sends int32 bundles from the master to the
* workers.
*
* WorkerProducerIntQueue sends int32 bundles from an worker producers
* to the master.
*
* There are different implementations and APIs for the two directions
* of the queue because the master is single-threaded and can't block,
* while there can be concurrent blocking workers. (The two data
* types could be merged but there doesn't seem to be any point.)
*
*
* Data is communicated as bundles of int32 values, such a bundle is
* called an "item" below. At the producer API an item is something
* that has a length property and responds to indexed read operations
* - Arrays, TypedArrays, or something application-specific. At the
* consumer API, an item comes out as an Array.
*
*
* MasterProducerIntQueue producer API:
*
* q.putOrFail(item) => true if the item was inserted, false if not
* q.callWhenCanPut(itemSize, callback) => true if callback was invoked directly, otherwise false
*
* MasterProducerIntQueue consumer API (worker side):
*
* q.canTake() => true if an item is available
* q.takeOrFail() => item or null
* q.take() => item, blocks until available
*
*
* WorkerProducerIntQueue producer API:
*
* q.putOrFail(item) => true if the item was inserted, false if not
* q.put(item) => undefined, blocks until space is vailable
*
* WorkerProducerIntQueue consumer API (master side):
*
* q.canTake() => true if an item is available
* q.takeOrFail() => item or null
* q.callWhenCanTake(callback) => true if callbck was invoked directly, otherwise false
*/
/* IMPORTANT USAGE NOTE:
*
* Clients of this library must ensure that
* AsymmetricSynchronic.filterEvent is invoked on Message events
* received in the master. See AsymmetricSynchronic for more
* information.
*
* (Yes, this is noncomposable in an interesting way. It's possible
* for each library to install a handler, and it'll work, but not
* great.)
*/
/* Implementation.
*
* Both queues have the same structure.
*
* I is the insertion pointer: the circular buffer index of the first
* available slot past the last item. Updated atomically by the master.
*
* R is the removal pointer: the circular buffer index of the first
* element of the oldest item still in the buffer. Updated atomically
* by the workers (see below).
*
* The buffer size bufsize is the number of data slots available.
*
* Both I and R increase modulo bufsize.
*
* The free capacity of a queue is R-I if I < R and bufsize-(I-R)
* otherwise.
*
* POP is a synchronic that is used for signaling insertions and
* removals, it has the count of integer values (not items) in the
* queue. The producer adds to POP when an item has been inserted;
* the consumer subtracts from POP when an item has been removed.
* Consumers that find the queue empty and producers that find the
* queue at capacity can wait for updates to POP.
*
* The queue is empty if free==bufsize.
*
* LATCH is a synchronic that's used as a latch to create a critical
* section in the workers: in the MasterProducerIntQueue, R is only
* updated with the latch held, allowing for concurrent consumers; in
* the WorkerProducerIntQueue, I is only updated with the latch held,
* allowing for concurrent producers.
*
* Memory layout:
* I R
* +------+-----------------------------------------------------------+
* |(meta)|ddddHdddddddd ---> HddddddHdd|
* +------------------------------------------------------------------+
*
* where meta contains I, R, POP, and LATCH.
*/
"use strict";
function align(n, v) {
return (n + (v-1)) & ~(v-1);
}
// Byte offsets
const _IQ_I = 0;
const _IQ_R = _IQ_I + 4;
const _IQ_POP = align(_IQ_R + 4, AsymmetricSynchronic.BYTE_ALIGN);
const _IQ_LATCH = align(_IQ_POP + AsymmetricSynchronic.BYTE_SIZE, AsymmetricSynchronic.BYTE_ALIGN);
const _IQ_BUF = align(_IQ_LATCH + AsymmetricSynchronic.BYTE_SIZE, 4);
const _IQ_START = _IQ_BUF;
// Int32 offsets
const _IQ_INSERT = _IQ_I >> 2;
const _IQ_REMOVE = _IQ_R >> 2;
const _IQ_BASE = _IQ_START >> 2;
///////////////////////////////////////////////////////////////////////////
//
// MasterProducerIntQueue.
/**
* "sab" is a SharedByteArray.
* "offset" is an aligned offset within "sab"
* "numBytes" is the size of the reserved area in "sab".
* "isMaster" is true if this is being constructed on the master thread.
*/
function MasterProducerIntQueue(sab, offset, numBytes, isMaster) {
this._size = (numBytes - MasterProducerIntQueue.BYTE_SIZE) >> 2;
if (this._size <= 0)
throw new Error("Buffer too small: " + numBytes);
this._ia = new Int32Array(sab, offset, numBytes >> 2);
this._pop = new AsymmetricSynchronic(sab, offset + _IQ_POP, isMaster, 0);
this._latch = new AsymmetricSynchronic(sab, offset + _IQ_LATCH, isMaster, 0);
if (isMaster) {
Atomics.store(this._ia, _IQ_INSERT, 0);
Atomics.store(this._ia, _IQ_REMOVE, 0);
}
}
/**
* The amount of space needed for the MasterProducerQueue for its
* internal data structures. Any additional space should be divisible
* by 4 (for integer data) and will be used for the buffer.
*/
MasterProducerIntQueue.BYTE_SIZE = _IQ_START;
/**
* The required byte alignment, divisible by 4, no larger than 16.
*/
MasterProducerIntQueue.BYTE_ALIGN = AsymmetricSynchronic.BYTE_ALIGN;
// The implementation uses this equivalence.
MasterProducerIntQueue.IMMEDIATE = AsymmetricSynchronic.IMMEDIATE;
MasterProducerIntQueue.DELAYED = AsymmetricSynchronic.DELAYED;
/**
* Insert the item if possible, and return true if so. Otherwise
* return false.
*/
MasterProducerIntQueue.prototype.putOrFail = function (item) {
this._checkAPI("putOrFail", true);
let ia = this._ia;
let size = this._size;
let insert = Atomics.load(ia, _IQ_INSERT);
let remove = Atomics.load(ia, _IQ_REMOVE);
let avail = insert < remove ? remove-insert : size-(insert-remove);
if (avail < item.length + 1)
return false;
ia[_IQ_BASE + insert] = item.length;
insert = (insert + 1) % size;
for ( let i=0 ; i < item.length ; i++ ) {
ia[_IQ_BASE + insert] = item[i];
insert = (insert + 1) % size;
}
Atomics.store(ia, _IQ_INSERT, insert);
this._pop.add(item.length + 1);
return true;
}
/**
* Invoke callback when there's space available in the queue for an
* item of length itemSize. The callback is invoked with a value
* indicating when it was called: MasterProducerIntQueue.IMMEDIATE if
* it was invoked immediately, MasterProducerIntQueue.DELAYED if it
* was invoked later.
*
* Returns true if the callback was invoked immediately, otherwise
* false.
*
* Typical usage here would be to call callWhenCanPut with a thunk
* that calls putOrFail on the same item (and asserts if the latter
* call fails).
*/
MasterProducerIntQueue.prototype.callWhenCanPut = function (itemSize, callback) {
this._checkAPI("callWhenCanPut", true);
let ia = this._ia;
let size = this._size;
let pop = this._pop;
let check = (when) => {
let oldpop = pop.load();
let insert = Atomics.load(ia, _IQ_INSERT);
let remove = Atomics.load(ia, _IQ_REMOVE);
let avail = insert < remove ? remove-insert : size-(insert-remove);
if (avail < itemSize + 1)
return pop.callWhenNotEquals(oldpop, check);
callback(when);
return when == MasterProducerIntQueue.IMMEDIATE;
}
return check(MasterProducerIntQueue.IMMEDIATE);
}
MasterProducerIntQueue.prototype.canTake = function () {
this._checkAPI("canTake", false);
return this._pop.load() > 0;
}
MasterProducerIntQueue.prototype.takeOrFail = function () {
this._checkAPI("takeOrFail", false);
let ia = this._ia;
let latch = this._latch;
let size = this._size;
while (latch.compareExchange(0,1) == 1)
latch.waitUntilEquals(0);
let insert = Atomics.load(ia, _IQ_INSERT);
let remove = Atomics.load(ia, _IQ_REMOVE);
if (insert == remove) {
latch.store(0);
return false;
}
let n = ia[_IQ_BASE + remove];
remove = (remove + 1) % size;
let item = [];
for ( ; n > 0 ; n-- ) {
item.push(ia[_IQ_BASE + remove]);
remove = (remove + 1) % size;
}
Atomics.store(ia, _IQ_REMOVE, remove);
this._pop.sub(item.length + 1);
latch.store(0);
return item;
}
MasterProducerIntQueue.prototype.take = function () {
this._checkAPI("take", false);
for (;;) {
let item = this.takeOrFail();
if (item)
return item;
this._pop.waitUntilNotEquals(0);
}
}
MasterProducerIntQueue.prototype._checkAPI = function(m, masterAPI) {
if (masterAPI == this._isMaster)
throw new Error("MasterProducerIntQueue API abuse: method '" + m + "' not available in " + (this._isMaster ? "master" : "worker"));
}
///////////////////////////////////////////////////////////////////////////
//
// WorkerProducerIntQueue.
function WorkerProducerIntQueue(sab, offset, numBytes, isMaster) {
this._size = (numBytes - WorkerProducerIntQueue.BYTE_SIZE) >> 2;
if (this._size <= 0)
throw new Error("Buffer too small: " + numBytes);
this._ia = new Int32Array(sab, offset, numBytes >> 2);
this._pop = new AsymmetricSynchronic(sab, offset + _IQ_POP, isMaster, 0);
this._latch = new AsymmetricSynchronic(sab, offset + _IQ_LATCH, isMaster, 0);
if (isMaster) {
Atomics.store(this._ia, _IQ_INSERT, 0);
Atomics.store(this._ia, _IQ_REMOVE, 0);
}
}
WorkerProducerIntQueue.BYTE_SIZE = _IQ_START;
WorkerProducerIntQueue.BYTE_ALIGN = AsymmetricSynchronic.BYTE_ALIGN;
WorkerProducerIntQueue.IMMEDIATE = AsymmetricSynchronic.IMMEDIATE;
WorkerProducerIntQueue.DELAYED = AsymmetricSynchronic.DELAYED;
WorkerProducerIntQueue.prototype.putOrFail = function (item) {
this._checkAPI("putOrFail", false);
let ia = this._ia;
let latch = this._latch;
while (latch.compareExchange(0,1) != 0)
latch.waitUntilEquals(0);
let insert = Atomics.load(ia, _IQ_INSERT);
let remove = Atomics.load(ia, _IQ_REMOVE);
let avail = insert < remove ? remove-insert : size-(insert-remove);
if (avail < item.length + 1) {
latch.store(0);
return false;
}
let size = this._size;
ia[_IQ_BASE + insert] = item.length;
insert = (insert + 1) % size;
for ( let i=0 ; i < item.length ; i++ ) {
ia[_IQ_BASE + insert] = item[i];
insert = (insert + 1) % size;
}
Atomics.store(ia, _IQ_INSERT, insert);
this._pop.add(item.length + 1);
latch.store(0);
return true;
}
WorkerProducerIntQueue.prototype.put = function (item) {
this._checkAPI("put", false);
for (;;) {
let oldpop = this._pop.load();
if (putOrFail(item))
break;
this._pop.waitUntilNotEquals(oldpop);
}
}
WorkerProducerIntQueue.prototype.canTake = function () {
this._checkAPI("canTake", true);
return this._pop.load() > 0;
}
WorkerProducerIntQueue.prototype.takeOrFail = function () {
this._checkAPI("takeOrFail", true);
let ia = this._ia;
let size = this._size;
let insert = Atomics.load(ia, _IQ_INSERT);
let remove = Atomics.load(ia, _IQ_REMOVE);
if (insert == remove)
return false;
let n = ia[_IQ_BASE + remove];
remove = (remove + 1) % size;
for ( ; n > 0 ; n-- ) {
item.push(ia[_IQ_BASE + remove]);
remove = (remove + 1) % size;
}
Atomics.store(ia, _IQ_REMOVE, remove);
this._pop.sub(item.length + 1);
latch.store(0);
return item;
}
WorkerProducerIntQueue.prototype.callWhenCanTake = function (callback) {
this._checkAPI("callWhenCanTake", true);
let check = (when) => {
if (this._pop.load() == 0)
return this._pop.callWhenNotEquals(0, check);
callback(when);
return when == WorkerProducerIntQueue.IMMEDIATE;
}
return check(WorkerProducerIntQueue.IMMEDIATE);
}
WorkerProducerIntQueue.prototype._checkAPI = function(tag, onMaster) {
if (onMaster != this._isMaster)
throw new Error("WorkerProducerIntQueue API abuse: method '" + m + "' not available in " + (this._isMaster ? "master" : "worker"));
}