forked from francisrstokes/hexnut-sequence
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
95 lines (82 loc) · 2.95 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
const SEQUENCE_SYMBOL = Symbol('@@HEXNUT_SEQUENCE_SYMBOL');
const uuid = require('uuid/v4');
function getNext(sequence, name, ctx) {
const {value, done} = sequence.iterator.next(ctx.message);
if (done) {
delete ctx[SEQUENCE_SYMBOL][name];
return true;
}
sequence.waitingFor = value;
return false;
};
function initSequence(ctx, next, generator, name, explicitConnection) {
ctx[SEQUENCE_SYMBOL][name] = {
generator,
iterator: generator(ctx, next),
waitingFor: explicitConnection
? { messageType: 'connection' }
: null
};
}
async function process(explicitConnection, isInteruptable, name, sequence, ctx, next, wait = false) {
if (sequence.waitingFor === null && getNext(sequence, name, ctx)) return await next();
if (sequence.waitingFor.messageType === 'assertion') {
if (!sequence.waitingFor.pred()) {
if (!isInteruptable) {
delete ctx[SEQUENCE_SYMBOL][name];
}
return await next();
}
getNext(sequence, name, ctx);
}
if (sequence.waitingFor.messageType === 'await') {
const promiseValue = await sequence.waitingFor.fn();
const {value, done} = sequence.iterator.next(promiseValue);
sequence.waitingFor = value;
if (done) initSequence(ctx, next, sequence.generator, name, explicitConnection);
return await process(explicitConnection, isInteruptable, name, sequence, ctx, next);
}
if (wait) return;
if (ctx.type === sequence.waitingFor.messageType) {
if (sequence.waitingFor.operation) {
if (sequence.waitingFor.operation === 'matchMessage' && !sequence.waitingFor.fn(ctx.message)) {
if (!isInteruptable) {
delete ctx[SEQUENCE_SYMBOL][name];
}
return await next();
}
}
if (getNext(sequence, name, ctx)) {
initSequence(ctx, next, sequence.generator, name, explicitConnection);
}
return await process(explicitConnection, isInteruptable, name, sequence, ctx, next, true);
}
return await next();
}
const sequenceProcessor = explicitConnection => isInteruptable => generator => {
const name = uuid();
return async (ctx, next) => {
if (!ctx[SEQUENCE_SYMBOL]) ctx[SEQUENCE_SYMBOL] = {};
if (!ctx[SEQUENCE_SYMBOL][name]) initSequence(ctx, next, generator, name, explicitConnection);
return await process(explicitConnection, isInteruptable, name, ctx[SEQUENCE_SYMBOL][name], ctx, next);
};
};
module.exports = {
onConnect: sequenceProcessor(true)(true),
interruptible:sequenceProcessor(false)(true),
interuptable: sequenceProcessor(false)(true),
uninterruptible:sequenceProcessor(false)(false),
uninteruptable: sequenceProcessor(false)(false),
matchMessage: matchFn => ({
messageType: 'message',
operation: 'matchMessage',
fn: matchFn
}),
getMessage: () => ({
messageType: 'message',
operation: 'matchMessage',
fn: () => true
}),
assert: pred => ({ messageType: 'assertion', pred }),
await: promiseReturningFn => ({ messageType: 'await', fn: promiseReturningFn })
};