forked from bigbluebutton/bigbluebutton
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb_hooks.js
149 lines (132 loc) · 5.72 KB
/
web_hooks.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const request = require("request");
const config = require("config");
const Hook = require("./hook.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
const MessageMapping = require("./messageMapping.js");
const UserMapping = require("./userMapping.js");
// Web hooks will listen for events on redis coming from BigBlueButton and
// perform HTTP calls with them to all registered hooks.
module.exports = class WebHooks {
constructor() {
this.subscriberEvents = Application.redisPubSubClient();
}
start(callback) {
this._subscribeToEvents();
typeof callback === 'function' ? callback(null,"w") : undefined;
}
// Subscribe to the events on pubsub that might need to be sent in callback calls.
_subscribeToEvents() {
this.subscriberEvents.on("psubscribe", (channel, count) => Logger.info(`[WebHooks] subscribed to:${channel}`));
this.subscriberEvents.on("pmessage", (pattern, channel, message) => {
let raw;
const processMessage = () => {
Logger.info(`[WebHooks] processing message on [${channel}]:`, JSON.stringify(message));
this._processEvent(message, raw);
};
try {
raw = JSON.parse(message);
let messageMapped = new MessageMapping();
messageMapped.mapMessage(JSON.parse(message));
message = messageMapped.mappedObject;
if (!_.isEmpty(message)) {
const intId = message.data.attributes.meeting["internal-meeting-id"];
IDMapping.reportActivity(intId);
// First treat meeting events to add/remove ID mappings
switch (message.data.id) {
case "meeting-created":
Logger.info(`[WebHooks] got create message on meetings channel [${channel}]:`, message);
IDMapping.addOrUpdateMapping(intId, message.data.attributes.meeting["external-meeting-id"], (error, result) => {
// has to be here, after the meeting was created, otherwise create calls won't generate
// callback calls for meeting hooks
processMessage();
});
break;
case "user-joined":
UserMapping.addOrUpdateMapping(message.data.attributes.user["internal-user-id"],message.data.attributes.user["external-user-id"], intId, message.data.attributes.user, () => {
processMessage();
});
break;
case "user-left":
UserMapping.removeMapping(message.data.attributes.user["internal-user-id"], () => { processMessage(); });
break;
case "meeting-ended":
UserMapping.removeMappingMeetingId(intId, () => { processMessage(); });
break;
default:
processMessage();
}
}
} catch (e) {
Logger.error("[WebHooks] error processing the message:", JSON.stringify(raw), ":", e.message);
}
});
config.get("hooks.channels").forEach((channel) => {
this.subscriberEvents.psubscribe(channel);
});
}
// Send raw data to hooks that are not expecting mapped messages
_processRaw(message) {
let idFromMessage;
let hooks = Hook.allGlobalSync();
// Add hooks for the specific meeting that expect raw data
// Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis
idFromMessage = this._findMeetingID(message);
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
// Notify the hooks that expect raw data
async.forEach(hooks, (hook) => {
if (hook.getRaw) {
Logger.info("[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
} // Put foreach inside the if to avoid pingpong events
}
_findMeetingID(message) {
if (message.data) {
return message.data.attributes.meeting["internal-meeting-id"];
}
if (message.payload) {
return message.payload.meeting_id;
}
if (message.envelope && message.envelope.routing && message.envelope.routing.meetingId) {
return message.envelope.routing.meetingId;
}
if (message.header && message.header.body && message.header.body.meetingId) {
return message.header.body.meetingId;
}
if (message.core && message.core.body) {
return message.core.body.props ? message.core.body.props.meetingProp.intId : message.core.body.meetingId;
}
return undefined;
}
// Processes an event received from redis. Will get all hook URLs that
// should receive this event and start the process to perform the callback.
_processEvent(message, raw) {
// Get all global hooks
let hooks = Hook.allGlobalSync();
// filter the hooks that need to receive this event
// add hooks that are registered for this specific meeting
const idFromMessage = message.data != null ? message.data.attributes.meeting["internal-meeting-id"] : undefined;
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
}
// Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message
async.forEach(hooks, (hook) => {
if (!hook.getRaw) {
Logger.info("[WebHooks] enqueueing a message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
const sendRaw = hooks.some(hook => { return hook.getRaw });
if (sendRaw && config.get("hooks.getRaw")) {
this._processRaw(raw);
}
}
};