Skip to content

Commit

Permalink
[minor] Added support different protocols
Browse files Browse the repository at this point in the history
[minor] Added support for binary?
[minor] Start tracking added ids so we can hopefully close everything cleanly
  • Loading branch information
3rd-Eden committed Apr 6, 2013
1 parent fda9de9 commit 1322eef
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 35 deletions.
42 changes: 32 additions & 10 deletions bin/thor
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ cli.usage('[options] ws://localhost')
.option('-A, --amount <connections>', 'the amount of persistent connections to generate', parseInt, 10000)
.option('-C, --concurrent <connections>', 'how many concurrent-connections per second', parseInt, 0)
.option('-M, --messages <messages>', 'messages to be send per connection', parseInt, 1)
.option('-P, --protocol <protocol>', 'WebSocket protocol version', parseInt, 13)
.option('-B, --buffer <size>', 'size of the messages that are send', parseInt, 1024)
.option('-W, --workers <cpus>', 'workers to be spawned', parseInt, os.cpus().length)
.option('-G, --generator <file>', 'custom message generators')
.option('-M, --masked', 'send the messaged with a mask')
.option('-b, --binary', 'send binary messages instead of utf-8')
.version(require('../package.json').version)
.parse(process.argv);

Expand All @@ -37,6 +40,9 @@ if (!cli.args.length) return [
//
var cluster = require('cluster')
, workers = cli.workers || 1
, ids = Object.create(null)
, connections = 0
, received = 0
, robin = [];

cluster.setupMaster({
Expand All @@ -45,7 +51,10 @@ cluster.setupMaster({
, args: [
cli.generator
? path.resolve(process.cwd(), cli.generator)
: path.resolve(__dirname, '../generator.js')
: path.resolve(__dirname, '../generator.js'),
cli.protocol,
!!cli.masked,
!!cli.binary
]
});

Expand All @@ -59,9 +68,18 @@ Object.keys(cluster.workers).forEach(function each(id) {
case 'open':
metrics.handshaken(data);
worker.emit('open::'+ data.id);

// Output the connection progress
if (++connections % 100 === 0) {
console.log(' Opened %s connections', connections);
}

break;

case 'close':
delete ids[data.id];
console.log('dead');

metrics.close(data);
worker.emit('close::'+ data.id);
break;
Expand All @@ -72,9 +90,15 @@ Object.keys(cluster.workers).forEach(function each(id) {
break;

case 'message':
received++;
metrics.message(data);
worker.emit('message::'+ data.id);
}

//
// Check if we have processed all connections so we can quit cleanly.
//
if (!Object.keys(ids).length) process.exit();
});

// Add our worker to our round robin queue so we can balance all our requests
Expand Down Expand Up @@ -120,6 +144,11 @@ async.forEach(cli.args, function forEach(url, done) {
var queue = async.queue(function working(id, fn) {
var worker = robin.shift();

// Register the id, so we can keep track of the connections that we still
// need to process.
ids[id] = 1;

// Process the connections
worker.send({ url: url, size: cli.buffer, messages: cli.messages, id: id });
worker.once('open::'+ id, fn);

Expand All @@ -128,22 +157,15 @@ async.forEach(cli.args, function forEach(url, done) {
}, cli.concurrent || Infinity);

// When all the events are processed successfully we should call.. back ;P
queue.ondrain = done;
queue.drain = done;

// Add all connections to the processing queue;
while (i--) queue.push(url +'::'+ i);
}, function established(err) {
metrics.established();

//
// All the connections have been established, for all urls for all the things
//
console.log('done');
});

process.once('SIGINT', process.exit.bind(process, 0));
process.once('exit', function summary() {
metrics.stop();

console.log(JSON.stringify(metrics.summary(), null, 2));
metrics.stop().summary();
});
29 changes: 19 additions & 10 deletions metrics.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var Stats = require('fast-stats').Stats
, colors = require('colors')
, sugar = require('sugar')
, table = require('tab');

Expand All @@ -25,6 +26,9 @@ function Metrics(requests) {

this.read = 0; // Bytes read
this.send = 0; // Bytes send

// Start tracking
this.start();
}

/**
Expand Down Expand Up @@ -127,8 +131,9 @@ Metrics.prototype.summary = function summary() {
{ label: '' }
]});

results.writeRow(['Time taken', this.timing.duration.duration()]);
results.writeRow(['Connections established', this.timing.established.duration()]);
console.log();
results.writeRow(['Online', this.timing.established + ' milliseconds']);
results.writeRow(['Time taken', this.timing.duration + ' milliseconds']);
results.writeRow(['Connected', this.connections]);
results.writeRow(['Disconnected', this.disconnects]);
results.writeRow(['Failed', this.failures]);
Expand All @@ -152,7 +157,9 @@ Metrics.prototype.summary = function summary() {
width++;

console.log();
console.log('Durations (ms)');
console.log('Durations (ms):');
console.log();

table.emitTable({
columns: [
{ label: '', width: 20 },
Expand All @@ -164,15 +171,15 @@ Metrics.prototype.summary = function summary() {
],
rows: [
[
'Handshaking:',
'Handshaking',
hrange[0].toFixed(),
handshaking.amean().toFixed(),
handshaking.stddev().toFixed(),
handshaking.median().toFixed(),
hrange[1].toFixed()
],
[
'Latency:',
'Latency',
lrange[0].toFixed(),
latency.amean().toFixed(),
latency.stddev().toFixed(),
Expand All @@ -181,9 +188,11 @@ Metrics.prototype.summary = function summary() {
]
]
});

console.log();
console.log('Percentile (ms):');
console.log();

console.log('Durations (ms)');
table.emitTable({
columns: [
{ label: '', width: 20 },
Expand All @@ -199,7 +208,7 @@ Metrics.prototype.summary = function summary() {
],
rows: [
[
'Handshaking:',
'Handshaking',
handshaking.percentile(50).toFixed(),
handshaking.percentile(66).toFixed(),
handshaking.percentile(75).toFixed(),
Expand All @@ -211,7 +220,7 @@ Metrics.prototype.summary = function summary() {
handshaking.percentile(100).toFixed()
],
[
'Latency:',
'Latency',
latency.percentile(50).toFixed(),
latency.percentile(66).toFixed(),
latency.percentile(75).toFixed(),
Expand All @@ -230,13 +239,13 @@ Metrics.prototype.summary = function summary() {
// failed to send a message.
//
if (this.failures) {
console.log();
console.log('Received errors:');
console.log();

Object.keys(this.errors).forEach(function error(err) {
results.writeRow([this.errors[err] +'x', err]);
}, this);

console.log();
}
};

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"commander": "1.1.x",
"async": "0.2.x",
"tab": "0.1.x",
"colors": "0.6.x",
"ws": "git://github.com/3rd-Eden/ws.git",
"sugar": "git://github.com/bluesmoon/node-faststats.git"
},
Expand Down
56 changes: 41 additions & 15 deletions thunderbolt.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,51 @@
'use strict';

var Socket = require('ws')
, collection = [];
, connections = {};

//
// Get the session document that is used to generate the data.
//
var session = require(process.argv[2]);

//
// WebSocket connection details.
//
var masked = process.argv[4] === 'true'
, binary = process.argv[5] === 'true'
, protocol = +process.argv[3] || 13;

process.on('message', function message(task) {
var now = Date.now();

//
// Write a new message to the socket. The message should have a size of x
//
if ('write' in task) collection.forEach(function write(socket) {
write(socket, task);
});
if ('write' in task) {
Object.keys(connections).forEach(function write(id) {
write(connections[id], task, id);
});
}

//
// Shut down every single socket.
//
if (task.shutdown) collection.forEach(function shutdown(socket) {
socket.close();
});
if (task.shutdown) {
Object.keys(connections).forEach(function shutdown(id) {
connections[id].close();
});
}

// End of the line, we are gonna start generating new connections.
if (!task.url) return;

var socket = new Socket(task.url);
var socket = new Socket(task.url, {
protocolVersion: protocol
});

socket.on('open', function open() {
process.send({ type: 'open', duration: Date.now() - now, id: task.id });
write(socket, task);
write(socket, task, task.id);

// As the `close` event is fired after the internal `_socket` is cleaned up
// we need to do some hacky shit in order to tack the bytes send.
Expand All @@ -45,7 +58,7 @@ process.on('message', function message(task) {
});

// Only write as long as we are allowed to send messages
if (--task.messages) write(socket, task);
if (--task.messages) write(socket, task, task.id);
});

socket.on('close', function close() {
Expand All @@ -58,26 +71,39 @@ process.on('message', function message(task) {

socket.on('error', function error(err) {
process.send({ type: 'error', message: err.message, id: task.id });

socket.close();
delete connections[task.id];
});

// Adding a new socket to our socket collection.
collection.push(socket);
connections[task.id] = socket;
});

/**
* Helper function from writing messages to the socket.
*
* @param {WebSocket} socket WebSocket connection we should write to
* @param {Object} task The given task
* @param {String} id
* @param {Function} fn The callback
* @api private
*/
function write(socket, task, fn) {
function write(socket, task, id, fn) {
var start = socket.last = Date.now();

session[task.method || 'utf8'](task.size, function message(err, data) {
socket.send(data, function sending(err) {
if (err) process.send({ type: 'error', message: err.message });
session[binary ? 'binary' : 'utf8'](task.size, function message(err, data) {
socket.send(data, {
binary: binary,
mask: masked
}, function sending(err) {
if (err) {
process.send({ type: 'error', message: err.message });

socket.close();
delete connections[id];
}

if (fn) fn(err);
});
});
Expand Down

0 comments on commit 1322eef

Please sign in to comment.