forked from bigbluebutton/bigbluebutton
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhook.js
325 lines (288 loc) · 11.6 KB
/
hook.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const config = require("./config.js");
const CallbackEmitter = require("./callback_emitter.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
// The database of hooks.
// Used always from memory, but saved to redis for persistence.
//
// Format:
// { id: Hook }
// Format on redis:
// * a SET "...:hooks" with all ids
// * a HASH "...:hook:<id>" for each hook with some of its attributes
let db = {};
let nextID = 1;
// The representation of a hook and its properties. Stored in memory and persisted
// to redis.
// Hooks can be global, receiving callback calls for events from all meetings on the
// server, or for a specific meeting. If an `externalMeetingID` is set in the hook,
// it will only receive calls related to this meeting, otherwise it will be global.
// Events are kept in a queue to be sent in the order they are received.
// But if the requests are going by but taking too long, the queue might be increasing
// faster than the callbacks are made. In this case the events will be concatenated
// and send up to 10 events in every post
module.exports = class Hook {
constructor() {
this.id = null;
this.callbackURL = null;
this.externalMeetingID = null;
this.queue = [];
this.emitter = null;
this.redisClient = redis.createClient();
this.permanent = false;
this.backupURL = [];
this.getRaw = false;
}
save(callback) {
this.redisClient.hmset(config.redis.keys.hook(this.id), this.toRedis(), (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hook to redis:", error, reply); }
this.redisClient.sadd(config.redis.keys.hooks, this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hookID to the list of hooks:", error, reply); }
db[this.id] = this;
(typeof callback === 'function' ? callback(error, db[this.id]) : undefined);
});
});
}
destroy(callback) {
this.redisClient.srem(config.redis.keys.hooks, this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing hookID from the list of hooks:", error, reply); }
this.redisClient.del(config.redis.keys.hook(this.id), error => {
if (error != null) { Logger.error("[Hook] error removing hook from redis:", error); }
if (db[this.id]) {
delete db[this.id];
(typeof callback === 'function' ? callback(error, true) : undefined);
} else {
(typeof callback === 'function' ? callback(error, false) : undefined);
}
});
});
}
// Is this a global hook?
isGlobal() {
return (this.externalMeetingID == null);
}
// The meeting from which this hook should receive events.
targetMeetingID() {
return this.externalMeetingID;
}
// Puts a new message in the queue. Will also trigger a processing in the queue so this
// message might be processed instantly.
enqueue(message) {
this.redisClient.llen(config.redis.keys.events(this.id), (error, reply) => {
const length = reply;
if (length < config.hooks.queueSize) {
Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message));
// Add message to redis queue
this.redisClient.rpush(config.redis.keys.events(this.id), JSON.stringify(message), (error,reply) => {});
if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); }
this.queue.push(JSON.stringify(message));
this._processQueue();
} else {
Logger.warn("[Hook] queue size exceed, event:", JSON.stringify(message));
}
});
}
toRedis() {
const r = {
"hookID": this.id,
"callbackURL": this.callbackURL,
"permanent": this.permanent,
"backupURL": this.backupURL,
"getRaw": this.getRaw
};
if (this.externalMeetingID != null) { r.externalMeetingID = this.externalMeetingID; }
return r;
}
fromRedis(redisData) {
this.id = parseInt(redisData.hookID);
this.callbackURL = redisData.callbackURL;
this.permanent = redisData.permanent;
this.backupURL = redisData.backupURL;
this.getRaw = redisData.getRaw;
if (redisData.externalMeetingID != null) {
this.externalMeetingID = redisData.externalMeetingID;
} else {
this.externalMeetingID = null;
}
}
// Gets the first message in the queue and start an emitter to send it. Will only do it
// if there is no emitter running already and if there is a message in the queue.
_processQueue() {
// Will try to send up to 10 messages together if they're enqueued
const lengthIn = this.queue.length > 10 ? 10 : this.queue.length;
let num = lengthIn + 1;
// Concat messages
let message = this.queue.slice(0,lengthIn);
message = message.join(",");
if ((message == null) || (this.emitter != null) || (lengthIn <= 0)) { return; }
// Add params so emitter will 'know' when a hook is permanent and have backupURLs
this.emitter = new CallbackEmitter(this.callbackURL, message, this.backupURL);
this.emitter.start(this.permanent);
this.emitter.on("success", () => {
delete this.emitter;
while ((num -= 1)) {
// Remove the sent message from redis
this.redisClient.lpop(config.redis.keys.events(this.id), (error, reply) => {
if (error != null) { return Logger.error("[Hook] error removing event from redis queue:", error); }
});
this.queue.shift();
} // pop the first message just sent
this._processQueue(); // go to the next message
});
// gave up trying to perform the callback, remove the hook forever if the hook's not permanent (emmiter will validate that)
return this.emitter.on("stopped", error => {
Logger.warn("[Hook] too many failed attempts to perform a callback call, removing the hook for:", this.callbackURL);
this.destroy();
});
}
static addSubscription(callbackURL, meetingID, getRaw, callback) {
//Since we can pass a list of URLs to serve as backup for the permanent hook, we need to check that
const firstURL = callbackURL instanceof Array ? callbackURL[0] : callbackURL;
let hook = Hook.findByCallbackURLSync(firstURL);
if (hook != null) {
return (typeof callback === 'function' ? callback(new Error("There is already a subscription for this callback URL"), hook) : undefined);
} else {
let msg = `[Hook] adding a hook with callback URL: [${firstURL}],`;
if (meetingID != null) { msg += ` for the meeting: [${meetingID}]`; }
Logger.info(msg);
hook = new Hook();
hook.callbackURL = firstURL;
hook.externalMeetingID = meetingID;
hook.getRaw = getRaw;
hook.permanent = config.hooks.aggr.some( url => {
return url === firstURL
});
if (hook.permanent) { hook.id = 1;nextID++; } else { hook.id = nextID++; }
// Create backup URLs list
let backupURLs = callbackURL instanceof Array ? callbackURL : [];
backupURLs.push(firstURL); backupURLs.shift();
hook.backupURL = backupURLs;
Logger.info("[Hook] Backup URLs:", hook.backupURL);
// Sync permanent queue
if (hook.permanent) {
hook.redisClient.llen(config.redis.keys.events(hook.id), (error, len) => {
if (len > 0) {
const length = len;
hook.redisClient.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
if (hook.queue.length > 0) { return hook._processQueue(); }
});
}
});
}
hook.save((error, hook) => typeof callback === 'function' ? callback(error, hook) : undefined);
}
}
static removeSubscription(hookID, callback) {
let hook = Hook.getSync(hookID);
if (((hook != null) && (hook.permanent === "false")) || (hook.permanent === false)) {
let msg = `[Hook] removing the hook with callback URL: [${hook.callbackURL}],`;
if (hook.externalMeetingID != null) { msg += ` for the meeting: [${hook.externalMeetingID}]`; }
Logger.info(msg);
hook.destroy((error, removed) => typeof callback === 'function' ? callback(error, removed) : undefined);
} else {
return (typeof callback === 'function' ? callback(null, false) : undefined);
}
}
static countSync() {
return Object.keys(db).length;
}
static getSync(id) {
return db[id];
}
static firstSync() {
const keys = Object.keys(db);
if (keys.length > 0) {
return db[keys[0]];
} else {
return null;
}
}
static findByExternalMeetingIDSync(externalMeetingID) {
const hooks = Hook.allSync();
return _.filter(hooks, hook => (externalMeetingID != null) && (externalMeetingID === hook.externalMeetingID));
}
static allGlobalSync() {
const hooks = Hook.allSync();
return _.filter(hooks, hook => hook.isGlobal());
}
static allSync() {
let arr = Object.keys(db).reduce(function(arr, id) {
arr.push(db[id]);
return arr;
}
, []);
return arr;
}
static clearSync() {
for (let id in db) {
delete db[id];
}
return db = {};
}
static findByCallbackURLSync(callbackURL) {
for (let id in db) {
if (db[id].callbackURL === callbackURL) {
return db[id];
}
}
}
static initialize(callback) {
Hook.resync(callback);
}
// Gets all hooks from redis to populate the local database.
// Calls `callback()` when done.
static resync(callback) {
let client = redis.createClient();
// Remove previous permanent hook (always ID = 1)
client.srem(config.redis.keys.hooks, 1, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from list:", error); }
client.del(config.redis.keys.hook(1), error => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from redis:", error); }
});
});
let tasks = [];
client.smembers(config.redis.keys.hooks, (error, hooks) => {
if (error != null) { Logger.error("[Hook] error getting list of hooks from redis:", error); }
hooks.forEach(id => {
tasks.push(done => {
client.hgetall(config.redis.keys.hook(id), function(error, hookData) {
if (error != null) { Logger.error("[Hook] error getting information for a hook from redis:", error); }
if (hookData != null) {
let length;
let hook = new Hook();
hook.fromRedis(hookData);
// sync events queue
client.llen(config.redis.keys.events(hook.id), (error, len) => {
length = len;
client.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
});
});
// Persist hook to redis
hook.save( (error, hook) => {
if (hook.id >= nextID) { nextID = hook.id + 1; }
if (hook.queue.length > 0) { hook._processQueue(); }
done(null, hook);
});
} else {
done(null, null);
}
});
});
});
async.series(tasks, function(errors, result) {
hooks = _.map(Hook.allSync(), hook => `[${hook.id}] ${hook.callbackURL}`);
Logger.info("[Hook] finished resync, hooks registered:", hooks);
(typeof callback === 'function' ? callback() : undefined);
});
});
}
};