forked from OptimalBits/bull
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_when_current_jobs_finished.js
146 lines (118 loc) · 3.31 KB
/
test_when_current_jobs_finished.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'use strict';
const expect = require('chai').expect;
const redis = require('ioredis');
const utils = require('./utils');
const delay = require('delay');
const sinon = require('sinon');
describe('.whenCurrentJobsFinished', () => {
let client;
beforeEach(() => {
client = new redis();
return client.flushdb();
});
afterEach(async () => {
sinon.restore();
await utils.cleanupQueues();
await client.flushdb();
return client.quit();
});
it('should handle queue with no processor', async () => {
const queue = await utils.newQueue();
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});
it('should handle queue with no jobs', async () => {
const queue = await utils.newQueue();
queue.process(() => Promise.resolve());
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});
it('should wait for job to complete', async () => {
const queue = await utils.newQueue();
await queue.add({});
let finishJob;
// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();
return new Promise(resolve => {
finishJob = resolve;
});
});
});
let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});
await delay(100);
expect(isFulfilled).to.equal(false);
finishJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});
it('should wait for all jobs to complete', async () => {
const queue = await utils.newQueue();
// add multiple jobs to queue
await queue.add({});
await queue.add({});
let finishJob1;
let finishJob2;
// wait for all jobs to be active
await new Promise(resolve => {
let callCount = 0;
queue.process(2, () => {
callCount++;
if (callCount === 1) {
return new Promise(resolve => {
finishJob1 = resolve;
});
}
resolve();
return new Promise(resolve => {
finishJob2 = resolve;
});
});
});
let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});
finishJob2();
await delay(100);
expect(isFulfilled).to.equal(
false,
'should not fulfill until all jobs are finished'
);
finishJob1();
await delay(100);
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once all jobs are finished'
);
});
it('should wait for job to fail', async () => {
const queue = await utils.newQueue();
await queue.add({});
let rejectJob;
// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();
return new Promise((resolve, reject) => {
rejectJob = reject;
});
});
});
let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});
await delay(100);
expect(isFulfilled).to.equal(false);
rejectJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});
});