Skip to content

Commit

Permalink
Completed WebSocket transport.
Browse files Browse the repository at this point in the history
Added (naive) support for closing frames.
  • Loading branch information
rauchg committed May 19, 2011
1 parent 080abcc commit 4f293ae
Showing 1 changed file with 262 additions and 5 deletions.
267 changes: 262 additions & 5 deletions lib/transports/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
* Module requirements.
*/

var Transport = require('../transport');
var Transport = require('../transport')
, EventEmitter = process.EventEmitter
, crypto = require('crypto')
, parser = require('../parser');

/**
* Export the constructor.
Expand All @@ -25,6 +28,20 @@ exports = module.exports = WebSocket;
*/

function WebSocket (data, request) {
// parser
var self = this

this.parser = new Parser();
this.parser.on('data', function (packet) {
self.onMessage(parser.decodePacket(packet));
});
this.parser.on('close', function () {
self.end();
});
this.parser.on('error', function () {
self.end();
});

Transport.call(this, data, request);

this.drained = true;
Expand All @@ -48,6 +65,92 @@ WebSocket.prototype.onSocketConnect = function () {
this.socket.on('drain', function () {
self.drained = true;
});

this.buffer = true;
this.buffered = [];

if (this.req.headers.upgrade !== 'WebSocket') {
this.log.warn('WebSocket connection invalid');
this.end();
return;
}

var origin = this.req.headers.origin
, location = (this.socket.encrypted ? 'wss' : 'ws')
+ '://' + this.req.headers.host + this.req.url
, waitingForNonce = false;

if (this.req.headers['sec-websocket-key1']) {
// If we don't have the nonce yet, wait for it (HAProxy compatibility).
if (! (this.req.head && this.req.head.length >= 8)) {
waitingForNonce = true;
}

var headers = [
'HTTP/1.1 101 WebSocket Protocol Handshake'
, 'Upgrade: WebSocket'
, 'Connection: Upgrade'
, 'Sec-WebSocket-Origin: ' + origin
, 'Sec-WebSocket-Location: ' + location
];

if (this.req.headers['sec-websocket-protocol']){
headers.push('Sec-WebSocket-Protocol: '
+ this.req.headers['sec-websocket-protocol']);
}
} else {
var headers = [
'HTTP/1.1 101 Web Socket Protocol Handshake'
, 'Upgrade: WebSocket'
, 'Connection: Upgrade'
, 'WebSocket-Origin: ' + origin
, 'WebSocket-Location: ' + location
];
}

try {
this.socket.write(headers.concat('', '').join('\r\n'));
this.socket.setTimeout(0);
this.socket.setNoDelay(true);
this.socket.setEncoding('utf8');
} catch (e) {
this.end();
return;
}

if (waitingForNonce) {
this.socket.setEncoding('binary');
} else if (this.proveReception(headers)) {
self.flush();
}

var headBuffer = '';

this.socket.on('data', function (data) {
if (waitingForNonce) {
headBuffer += data;

if (headBuffer.length < 8) {
return;
}

// Restore the connection to utf8 encoding after receiving the nonce
self.socket.setEncoding('utf8');
waitingForNonce = false;

// Stuff the nonce into the location where it's expected to be
self.req.head = headBuffer.substr(0, 8);
headBuffer = '';

if (self.proveReception(headers)) {
self.flush();
}

return;
}

self.parser.add(data);
});
};

/**
Expand All @@ -56,8 +159,37 @@ WebSocket.prototype.onSocketConnect = function () {
* @api private
*/

WebSocket.prototype.write = function () {
this.drained = false;
WebSocket.prototype.write = function (data) {
if (this.open) {
this.drained = false;

if (this.buffer) {
this.buffered.push(data);
return this;
}

try {
this.socket.write('\u0000', 'binary');
this.socket.write(data, 'utf8');
this.socket.write('\uffff', 'binary');
} catch (e) {
this.end();
}
}
};

/**
* Flushes the internal buffer
*
* @api private
*/

WebSocket.prototype.flush = function () {
this.buffer = false;

for (var i = 0, l = this.buffered.length; i < l; i++) {
this.write[this.buffered.splice(0, 1)];
}
};

/**
Expand All @@ -66,14 +198,59 @@ WebSocket.prototype.write = function () {
* @api private
*/

WebSocket.prototype.writeVolatile = function () {
WebSocket.prototype.writeVolatile = function (msg) {
if (this.drained) {
this.write();
this.write(msg);
} else {
this.log.debug('ignoring volatile message, buffer not drained');
}
};

/**
* Finishes the handshake.
*
* @api private
*/

WebSocket.prototype.proveReception = function (headers) {
var self = this
, k1 = this.req.headers['sec-websocket-key1']
, k2 = this.req.headers['sec-websocket-key2'];

if (k1 && k2){
var md5 = crypto.createHash('md5');

[k1, k2].forEach(function (k) {
var n = parseInt(k.replace(/[^\d]/g, ''))
, spaces = k.replace(/[^ ]/g, '').length;

if (spaces === 0 || n % spaces !== 0){
self.log.warn('Invalid WebSocket key: "' + k + '".');
self.end();
return false;
}

n /= spaces;

md5.update(String.fromCharCode(
n >> 24 & 0xFF,
n >> 16 & 0xFF,
n >> 8 & 0xFF,
n & 0xFF));
});

md5.update(this.req.head.toString('binary'));

try {
this.socket.write(md5.digest('binary'), 'binary');
} catch (e) {
this.end();
}
}

return true;
};

/**
* Writes a payload.
*
Expand All @@ -95,5 +272,85 @@ WebSocket.prototype.payload = function (msgs) {
*/

WebSocket.prototype.doClose = function () {
try {
this.socket.write('\xff\x00', 'binary');
} catch (e) {
this.onClose();
}

this.socket.end();
};

/**
* WebSocket parser
*
* @api public
*/

function Parser () {
this.buffer = '';
this.i = 0;
};

/**
* Inherits from EventEmitter.
*/

Parser.prototype.__proto__ = EventEmitter.prototype;

/**
* Adds data to the buffer.
*
* @api public
*/

Parser.prototype.add = function (data) {
this.buffer += data;
this.parse();
};

/**
* Parses the buffer.
*
* @api private
*/

Parser.prototype.parse = function () {
for (var i = this.i, chr, l = this.buffer.length; i < l; i++){
chr = this.buffer[i];

if (this.buffer.length == 2 && this.buffer[1] == '\u0000') {
this.emit('close');
this.buffer = '';
this.i = 0;
return;
}

if (i === 0){
if (chr != '\u0000')
this.error('Bad framing. Expected null byte as first frame');
else
continue;
}

if (chr == '\ufffd'){
this.emit('data', this.buffer.substr(1, this.buffer.length - 2));
this.buffer = this.buffer.substr(i + 1);
this.i = 0;
return this.parse();
}
}
};

/**
* Handles an error
*
* @api private
*/

Parser.prototype.error = function (reason) {
this.buffer = '';
this.i = 0;
this.emit('error', reason);
return this;
};

0 comments on commit 4f293ae

Please sign in to comment.