diff --git a/bin/thor b/bin/thor index 7822880..c1c13fd 100755 --- a/bin/thor +++ b/bin/thor @@ -15,9 +15,12 @@ cli.usage('[options] ws://localhost') .option('-A, --amount ', 'the amount of persistent connections to generate', parseInt, 10000) .option('-C, --concurrent ', 'how many concurrent-connections per second', parseInt, 0) .option('-M, --messages ', 'messages to be send per connection', parseInt, 1) + .option('-P, --protocol ', 'WebSocket protocol version', parseInt, 13) .option('-B, --buffer ', 'size of the messages that are send', parseInt, 1024) .option('-W, --workers ', 'workers to be spawned', parseInt, os.cpus().length) .option('-G, --generator ', 'custom message generators') + .option('-M, --masked', 'send the messaged with a mask') + .option('-b, --binary', 'send binary messages instead of utf-8') .version(require('../package.json').version) .parse(process.argv); @@ -37,6 +40,9 @@ if (!cli.args.length) return [ // var cluster = require('cluster') , workers = cli.workers || 1 + , ids = Object.create(null) + , connections = 0 + , received = 0 , robin = []; cluster.setupMaster({ @@ -45,7 +51,10 @@ cluster.setupMaster({ , args: [ cli.generator ? path.resolve(process.cwd(), cli.generator) - : path.resolve(__dirname, '../generator.js') + : path.resolve(__dirname, '../generator.js'), + cli.protocol, + !!cli.masked, + !!cli.binary ] }); @@ -59,9 +68,18 @@ Object.keys(cluster.workers).forEach(function each(id) { case 'open': metrics.handshaken(data); worker.emit('open::'+ data.id); + + // Output the connection progress + if (++connections % 100 === 0) { + console.log(' Opened %s connections', connections); + } + break; case 'close': + delete ids[data.id]; + console.log('dead'); + metrics.close(data); worker.emit('close::'+ data.id); break; @@ -72,9 +90,15 @@ Object.keys(cluster.workers).forEach(function each(id) { break; case 'message': + received++; metrics.message(data); worker.emit('message::'+ data.id); } + + // + // Check if we have processed all connections so we can quit cleanly. + // + if (!Object.keys(ids).length) process.exit(); }); // Add our worker to our round robin queue so we can balance all our requests @@ -120,6 +144,11 @@ async.forEach(cli.args, function forEach(url, done) { var queue = async.queue(function working(id, fn) { var worker = robin.shift(); + // Register the id, so we can keep track of the connections that we still + // need to process. + ids[id] = 1; + + // Process the connections worker.send({ url: url, size: cli.buffer, messages: cli.messages, id: id }); worker.once('open::'+ id, fn); @@ -128,22 +157,15 @@ async.forEach(cli.args, function forEach(url, done) { }, cli.concurrent || Infinity); // When all the events are processed successfully we should call.. back ;P - queue.ondrain = done; + queue.drain = done; // Add all connections to the processing queue; while (i--) queue.push(url +'::'+ i); }, function established(err) { metrics.established(); - - // - // All the connections have been established, for all urls for all the things - // - console.log('done'); }); process.once('SIGINT', process.exit.bind(process, 0)); process.once('exit', function summary() { - metrics.stop(); - - console.log(JSON.stringify(metrics.summary(), null, 2)); + metrics.stop().summary(); }); diff --git a/metrics.js b/metrics.js index 378f06e..58bfd11 100644 --- a/metrics.js +++ b/metrics.js @@ -1,6 +1,7 @@ 'use strict'; var Stats = require('fast-stats').Stats + , colors = require('colors') , sugar = require('sugar') , table = require('tab'); @@ -25,6 +26,9 @@ function Metrics(requests) { this.read = 0; // Bytes read this.send = 0; // Bytes send + + // Start tracking + this.start(); } /** @@ -127,8 +131,9 @@ Metrics.prototype.summary = function summary() { { label: '' } ]}); - results.writeRow(['Time taken', this.timing.duration.duration()]); - results.writeRow(['Connections established', this.timing.established.duration()]); + console.log(); + results.writeRow(['Online', this.timing.established + ' milliseconds']); + results.writeRow(['Time taken', this.timing.duration + ' milliseconds']); results.writeRow(['Connected', this.connections]); results.writeRow(['Disconnected', this.disconnects]); results.writeRow(['Failed', this.failures]); @@ -152,7 +157,9 @@ Metrics.prototype.summary = function summary() { width++; console.log(); - console.log('Durations (ms)'); + console.log('Durations (ms):'); + console.log(); + table.emitTable({ columns: [ { label: '', width: 20 }, @@ -164,7 +171,7 @@ Metrics.prototype.summary = function summary() { ], rows: [ [ - 'Handshaking:', + 'Handshaking', hrange[0].toFixed(), handshaking.amean().toFixed(), handshaking.stddev().toFixed(), @@ -172,7 +179,7 @@ Metrics.prototype.summary = function summary() { hrange[1].toFixed() ], [ - 'Latency:', + 'Latency', lrange[0].toFixed(), latency.amean().toFixed(), latency.stddev().toFixed(), @@ -181,9 +188,11 @@ Metrics.prototype.summary = function summary() { ] ] }); + + console.log(); + console.log('Percentile (ms):'); console.log(); - console.log('Durations (ms)'); table.emitTable({ columns: [ { label: '', width: 20 }, @@ -199,7 +208,7 @@ Metrics.prototype.summary = function summary() { ], rows: [ [ - 'Handshaking:', + 'Handshaking', handshaking.percentile(50).toFixed(), handshaking.percentile(66).toFixed(), handshaking.percentile(75).toFixed(), @@ -211,7 +220,7 @@ Metrics.prototype.summary = function summary() { handshaking.percentile(100).toFixed() ], [ - 'Latency:', + 'Latency', latency.percentile(50).toFixed(), latency.percentile(66).toFixed(), latency.percentile(75).toFixed(), @@ -230,13 +239,13 @@ Metrics.prototype.summary = function summary() { // failed to send a message. // if (this.failures) { + console.log(); console.log('Received errors:'); + console.log(); Object.keys(this.errors).forEach(function error(err) { results.writeRow([this.errors[err] +'x', err]); }, this); - - console.log(); } }; diff --git a/package.json b/package.json index 1816164..1d389f1 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "commander": "1.1.x", "async": "0.2.x", "tab": "0.1.x", + "colors": "0.6.x", "ws": "git://github.com/3rd-Eden/ws.git", "sugar": "git://github.com/bluesmoon/node-faststats.git" }, diff --git a/thunderbolt.js b/thunderbolt.js index fe56508..e1aa535 100644 --- a/thunderbolt.js +++ b/thunderbolt.js @@ -1,38 +1,51 @@ 'use strict'; var Socket = require('ws') - , collection = []; + , connections = {}; // // Get the session document that is used to generate the data. // var session = require(process.argv[2]); +// +// WebSocket connection details. +// +var masked = process.argv[4] === 'true' + , binary = process.argv[5] === 'true' + , protocol = +process.argv[3] || 13; + process.on('message', function message(task) { var now = Date.now(); // // Write a new message to the socket. The message should have a size of x // - if ('write' in task) collection.forEach(function write(socket) { - write(socket, task); - }); + if ('write' in task) { + Object.keys(connections).forEach(function write(id) { + write(connections[id], task, id); + }); + } // // Shut down every single socket. // - if (task.shutdown) collection.forEach(function shutdown(socket) { - socket.close(); - }); + if (task.shutdown) { + Object.keys(connections).forEach(function shutdown(id) { + connections[id].close(); + }); + } // End of the line, we are gonna start generating new connections. if (!task.url) return; - var socket = new Socket(task.url); + var socket = new Socket(task.url, { + protocolVersion: protocol + }); socket.on('open', function open() { process.send({ type: 'open', duration: Date.now() - now, id: task.id }); - write(socket, task); + write(socket, task, task.id); // As the `close` event is fired after the internal `_socket` is cleaned up // we need to do some hacky shit in order to tack the bytes send. @@ -45,7 +58,7 @@ process.on('message', function message(task) { }); // Only write as long as we are allowed to send messages - if (--task.messages) write(socket, task); + if (--task.messages) write(socket, task, task.id); }); socket.on('close', function close() { @@ -58,10 +71,13 @@ process.on('message', function message(task) { socket.on('error', function error(err) { process.send({ type: 'error', message: err.message, id: task.id }); + + socket.close(); + delete connections[task.id]; }); // Adding a new socket to our socket collection. - collection.push(socket); + connections[task.id] = socket; }); /** @@ -69,15 +85,25 @@ process.on('message', function message(task) { * * @param {WebSocket} socket WebSocket connection we should write to * @param {Object} task The given task + * @param {String} id * @param {Function} fn The callback * @api private */ -function write(socket, task, fn) { +function write(socket, task, id, fn) { var start = socket.last = Date.now(); - session[task.method || 'utf8'](task.size, function message(err, data) { - socket.send(data, function sending(err) { - if (err) process.send({ type: 'error', message: err.message }); + session[binary ? 'binary' : 'utf8'](task.size, function message(err, data) { + socket.send(data, { + binary: binary, + mask: masked + }, function sending(err) { + if (err) { + process.send({ type: 'error', message: err.message }); + + socket.close(); + delete connections[id]; + } + if (fn) fn(err); }); });