forked from inolen/quakejs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.js
350 lines (274 loc) · 8.32 KB
/
master.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
var _ = require('underscore');
var http = require('http');
var logger = require('winston');
var opt = require('optimist');
var url = require('url');
var WebSocketClient = require('ws');
var WebSocketServer = require('ws').Server;
var argv = require('optimist')
.describe('config', 'Location of the configuration file').default('config', './config.json')
.argv;
if (argv.h || argv.help) {
opt.showHelp();
return;
}
logger.cli();
logger.level = 'debug';
var config = loadConfig(argv.config);
var clients = [];
var servers = {};
var pruneInterval = 350 * 1000;
function formatOOB(data) {
var str = '\xff\xff\xff\xff' + data + '\x00';
var buffer = new ArrayBuffer(str.length);
var view = new Uint8Array(buffer);
for (var i = 0; i < str.length; i++) {
view[i] = str.charCodeAt(i);
}
return buffer;
}
function stripOOB(buffer) {
var view = new DataView(buffer);
if (view.getInt32(0) !== -1) {
return null;
}
var str = '';
for (var i = 4 /* ignore leading -1 */; i < buffer.byteLength - 1 /* ignore trailing \0 */; i++) {
var c = String.fromCharCode(view.getUint8(i));
str += c;
}
return str;
}
function parseInfoString(str) {
var data = {};
var split = str.split('\\');
// throw when split.length isn't even?
for (var i = 0; i < split.length - 1; i += 2) {
var key = split[i];
var value = split[i+1];
data[key] = value;
}
}
/**********************************************************
*
* messages
*
**********************************************************/
var CHALLENGE_MIN_LENGTH = 9;
var CHALLENGE_MAX_LENGTH = 12;
function buildChallenge() {
var challenge = '';
var length = CHALLENGE_MIN_LENGTH - 1 +
parseInt(Math.random() * (CHALLENGE_MAX_LENGTH - CHALLENGE_MIN_LENGTH + 1), 10);
for (var i = 0; i < length; i++) {
var c;
do {
c = Math.floor(Math.random() * (126 - 33 + 1) + 33); // -> 33 ... 126 (inclusive)
} while (c === '\\'.charCodeAt(0) || c === ';'.charCodeAt(0) || c === '"'.charCodeAt(0) || c === '%'.charCodeAt(0) || c === '/'.charCodeAt(0));
challenge += String.fromCharCode(c);
}
return challenge;
}
function handleGetServers(conn, data) {
logger.info(conn.addr + ':' + conn.port + ' ---> getservers');
sendGetServersResponse(conn, servers);
}
function handleHeartbeat(conn, data) {
logger.info(conn.addr + ':' + conn.port + ' ---> heartbeat');
sendGetInfo(conn);
}
function handleInfoResponse(conn, data) {
logger.info(conn.addr + ':' + conn.port + ' ---> infoResponse');
var info = parseInfoString(data);
// TODO validate data
updateServer(conn.addr, conn.port);
}
function sendGetInfo(conn) {
var challenge = buildChallenge();
logger.info(conn.addr + ':' + conn.port + ' <--- getinfo with challenge \"' + challenge + '\"');
var buffer = formatOOB('getinfo ' + challenge);
conn.socket.send(buffer, { binary: true });
}
function sendGetServersResponse(conn, servers) {
var msg = 'getserversResponse';
for (var id in servers) {
if (!servers.hasOwnProperty(id)) {
continue;
}
var server = servers[id];
var octets = server.addr.split('.').map(function (n) {
return parseInt(n, 10);
});
msg += '\\';
msg += String.fromCharCode(octets[0] & 0xff);
msg += String.fromCharCode(octets[1] & 0xff);
msg += String.fromCharCode(octets[2] & 0xff)
msg += String.fromCharCode(octets[3] & 0xff);
msg += String.fromCharCode((server.port & 0xff00) >> 8);
msg += String.fromCharCode(server.port & 0xff);
}
msg += '\\EOT';
logger.info(conn.addr + ':' + conn.port + ' <--- getserversResponse with ' + Object.keys(servers).length + ' server(s)');
var buffer = formatOOB(msg);
conn.socket.send(buffer, { binary: true });
}
/**********************************************************
*
* servers
*
**********************************************************/
function serverid(addr, port) {
return addr + ':' + port;
}
function updateServer(addr, port) {
var id = serverid(addr, port);
var server = servers[id];
if (!server) {
server = servers[id] = { addr: addr, port: port };
}
server.lastUpdate = Date.now();
// send partial update to all clients
for (var i = 0; i < clients.length; i++) {
sendGetServersResponse(clients[i], { id: server });
}
}
function removeServer(id) {
var server = servers[id];
delete servers[id];
logger.info(server.addr + ':' + server.port + ' timed out, ' + Object.keys(servers).length + ' server(s) currently registered');
}
function pruneServers() {
var now = Date.now();
for (var id in servers) {
if (!servers.hasOwnProperty(id)) {
continue;
}
var server = servers[id];
var delta = now - server.lastUpdate;
if (delta > pruneInterval) {
removeServer(id);
}
}
}
/**********************************************************
*
* clients
*
**********************************************************/
function handleSubscribe(conn) {
addClient(conn);
// send all servers upon subscribing
sendGetServersResponse(conn, servers);
}
function addClient(conn) {
var idx = clients.indexOf(conn);
if (idx !== -1) {
return; // already subscribed
}
logger.info(conn.addr + ':' + conn.port + ' ---> subscribe');
clients.push(conn);
}
function removeClient(conn) {
var idx = clients.indexOf(conn);
if (idx === -1) {
return; // conn may have belonged to a server
}
var conn = clients[idx];
logger.info(conn.addr + ':' + conn.port + ' ---> unsubscribe');
clients.splice(idx, 1);
}
/**********************************************************
*
* main
*
**********************************************************/
function getRemoteAddress(ws) {
// by default, check the underlying socket's remote address
var address = ws._socket.remoteAddress;
// if this is an x-forwarded-for header (meaning the request
// has been proxied), use it
if (ws.upgradeReq.headers['x-forwarded-for']) {
address = ws.upgradeReq.headers['x-forwarded-for'];
}
return address;
}
function getRemotePort(ws) {
var port = ws._socket.remotePort;
if (ws.upgradeReq.headers['x-forwarded-port']) {
port = ws.upgradeReq.headers['x-forwarded-port'];
}
return port;
}
function connection(ws) {
this.socket = ws;
this.addr = getRemoteAddress(ws);
this.port = getRemotePort(ws);
}
function loadConfig(configPath) {
var config = {
port: 27950
};
try {
console.log('Loading config file from ' + configPath + '..');
var data = require(configPath);
_.extend(config, data);
} catch (e) {
console.log('Failed to load config', e);
}
return config;
}
(function main() {
var server = http.createServer();
var wss = new WebSocketServer({
server: server
});
wss.on('connection', function (ws) {
var conn = new connection(ws);
var first = true;
ws.on('message', function (buffer, flags) {
if (!flags.binary) {
return;
}
// node Buffer to ArrayBuffer
var view = Uint8Array.from(buffer);
var buffer = view.buffer;
// check to see if this is emscripten's port identifier message
var wasfirst = first;
first = false;
if (wasfirst &&
view.byteLength === 10 &&
view[0] === 255 && view[1] === 255 && view[2] === 255 && view[3] === 255 &&
view[4] === 'p'.charCodeAt(0) && view[5] === 'o'.charCodeAt(0) && view[6] === 'r'.charCodeAt(0) && view[7] === 't'.charCodeAt(0)) {
conn.port = ((view[8] << 8) | view[9]);
return;
}
var msg = stripOOB(buffer);
if (!msg) {
removeClient(conn);
return;
}
if (msg.indexOf('getservers ') === 0) {
handleGetServers(conn, msg.substr(11));
} else if (msg.indexOf('heartbeat ') === 0) {
handleHeartbeat(conn, msg.substr(10));
} else if (msg.indexOf('infoResponse\n') === 0) {
handleInfoResponse(conn, msg.substr(13));
} else if (msg.indexOf('subscribe') === 0) {
handleSubscribe(conn);
} else {
console.error('unexpected message "' + msg + '"');
}
});
ws.on('error', function (err) {
removeClient(conn);
});
ws.on('close', function () {
removeClient(conn);
});
});
// listen only on 0.0.0.0 to force ipv4
server.listen(config.port, '0.0.0.0', function() {
console.log('master server is listening on port ' + server.address().port);
});
setInterval(pruneServers, pruneInterval);
})();