forked from kriskowal/q
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.js
159 lines (145 loc) · 4.02 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
var Q = require("../q");
var renewalInterval = 1e10; // a long time
function makeQueue() {
var ends = Q.defer();
return {
"send": function (value) {
var next = Q.defer();
ends.resolve({
"head": value,
"tail": next.promise
});
ends.resolve = next.resolve;
},
"next": function () {
var result = Q.get(ends.promise, "head");
ends.promise = Q.get(ends.promise, "tail");
return result;
}
};
}
function makeStream(label) {
var result = Q.defer();
var queue = makeQueue();
var outstanding = 0;
var closed;
function send(value) {
outstanding++;
if (!closed) {
queue.send(value);
}
// ignore attempts to send values to a
// closed stream.
}
function close() {
closed = true;
// we are not actually closed
// until the last value gets emitted
}
function forEach(callback, thisp) {
Q.when(queue.next(), function (value) {
outstanding--;
// summon the next iteration
forEach(callback, thisp);
// handle this iteration
return Q.call(callback, thisp, value)
// resolve if this was the last
// value out the door.
.fin(function () {
if (outstanding === 0 && closed) {
console.log("CLOSED", label);
result.resolve();
}
});
})
.end();
return result.promise;
}
var promise = Q.makePromise({
"forEach": forEach,
"all": function () {
var object = [];
forEach(function (value) {
object.push(value);
});
return Q.when(result.promise, function () {
return object;
});
},
"map": function (callback, thisp) {
var map = makeStream(label + "|map");
forEach(function (value) {
Q.call(callback, thisp, value)
.then(map.send)
.end()
})
Q.when(result.promise, map.close);
return map.promise;
},
"filter": function (callback, thisp) {
var filter = makeStream(label + "|filter");
forEach(function (value) {
Q.call(callback, thisp, value)
.then(function (guard) {
if (guard) {
filter.send(value);
}
})
.end()
})
Q.when(result.promise, filter.close);
return filter.promise;
},
"reduce": function (callback, basis, thisp) {
var i = 0;
forEach(function (value) {
basis = Q.when(basis, function (basis) {
return Q.call(callback, thisp, basis, value, i++, result.promise);
});
})
return Q.when(result.promise, function () {
return basis;
});
}
}, function fallback(op) {
var args = Array.prototype.slice.call(arguments);
return Q.send.apply(this, [result.promise].concat(args));
});
return {
send: send,
close: close,
promise: promise
};
};
var stream = makeStream("sequence");
stream.promise
.map(function (n) {
console.log("mapping", n);
return Q.delay(n, 0)
.fin(function () {
console.log("emitting", n);
})
})
.filter(function (n) {
console.log("filtering", n);
return n % 2;
})
.reduce(function (old, n) {
console.log('reducing', old, n);
return old + n;
}, 0)
.then(function (result) {
console.log("result", result);
})
.end()
var n = 0;
var handle = setInterval(function () {
stream.send(n++);
if (n === 10) {
console.log("CLOSING");
stream.close();
}
}, 10);
Q.when(stream.promise, function () {
clearInterval(handle);
});