Skip to content

Commit

Permalink
[metrics] Adding the metrics collection. This depends on websockets/w…
Browse files Browse the repository at this point in the history
…s#176 for "accurate" information
  • Loading branch information
3rd-Eden committed Apr 5, 2013
1 parent 5a1d672 commit 1517ffb
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 22 deletions.
20 changes: 16 additions & 4 deletions bin/thor
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env node
'use strict';

var async = require('async')
var Metrics = require('../metrics')
, async = require('async')
, path = require('path')
, os = require('os');

Expand Down Expand Up @@ -56,18 +57,22 @@ Object.keys(cluster.workers).forEach(function each(id) {
worker.on('message', function message(data) {
switch (data.type) {
case 'open':
metrics.handshaken(data);
worker.emit('open::'+ data.id);
break;

case 'close':
metrics.close(data);
worker.emit('close::'+ data.id);
break;

case 'error':
metrics.error(data);
worker.emit('error::'+ data.id);
break;

case 'message':
metrics.message(data);
worker.emit('message::'+ data.id);
}
});
Expand Down Expand Up @@ -96,6 +101,11 @@ Object.keys(cluster.workers).forEach(function each(id) {
console.log(line);
});

//
// Metrics collection.
//
var metrics = new Metrics(cli.amount * cli.args.length);

// Iterate over all the urls so we can target multiple locations at once, which
// is helpfull if you are testing multiple loadbalancer endpoints for example.
async.forEach(cli.args, function forEach(url, done) {
Expand Down Expand Up @@ -123,11 +133,13 @@ async.forEach(cli.args, function forEach(url, done) {
// Add all connections to the processing queue;
while (i--) queue.push(url +'::'+ i);
}, function established(err) {
metrics.established();

//
// All the connections have been established, for all urls for all the things
//
console.log('done');
});

process.once('SIGINT', process.exit.bind(process, 0));
process.once('exit', function summary() {

});
process.once('exit', function summary() { });
91 changes: 75 additions & 16 deletions metrics.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
'use strict';

function Metrics(requests) {
this.requests = requests;
this.requests = requests; // The total amount of requests send

this.connections = 0;
this.disconnects = 0;
this.connections = 0; // Connections established
this.disconnects = 0; // Closed connections
this.failures = 0; // Connections that received an error

this.errors = Object.create(null);
this.timing = Object.create(null);
this.errors = Object.create(null); // Collection of different errors
this.timing = Object.create(null); // Different timings

this.latency = []; // Latencies of the echo'd messages
this.handshaking = []; // Handshake duration

this.read = 0; // Bytes read
this.send = 0; // Bytes send
}

/**
* The metrics has started collecting.
*
* @api private
* @api public
*/
Metrics.prototype.start = function start() {
this.timing.start = Date.now();
Expand All @@ -23,35 +30,87 @@ Metrics.prototype.start = function start() {
/**
* The metrics has stopped collecting.
*
* @api private
* @api public
*/
Metrics.prototype.stop = function stop() {
this.timing.stop = Date.now();
this.timing.duration = this.timing.stop - this.timing.start;
return this;
};

Metrics.prototype.ready = function open() {
/**
* All the connections are established
*
* @api public
*/
Metrics.prototype.established = function established() {
this.timing.ready = Date.now();
this.timing.handshaken = this.timing.ready - this.timing.start;
this.timing.established = this.timing.ready - this.timing.start;
};

/**
* Log an new error.
*
* @param {String} err Error message
* @api private
* @param {Object} data The error
* @api public
*/
Metrics.prototype.error = function error(err) {
if (!err) return this.errors;
Metrics.prototype.error = function error(data) {
this.failures++;

var collection = this.errors[data.message];
if (!collection) this.errors[data.message] = 1;
else this.errors[data.message]++;

return this;
};

var collection = this.errors[err];
if (!collection) this.errors[err] = 1;
else this.errors[err]++;
/**
* Register a message resposne.
*
* @param {Object} data The message details.
* @api public
*/
Metrics.prototype.message = function message(data) {
this.latency.push(data.latency);

return this;
};

/**
* Register a successful handshake + open.
*
* @param {Object} data Handshake details.
* @api public
*/
Metrics.prototype.handshaken = function handshaken(data) {
this.connections++;
this.handshaking.push(data.duration);

return this;
};

/**
* The connection has closed.
*
* @param {Object} data Close information
* @api public
*/
Metrics.prototype.close = function close(data) {
this.disconnections++;
this.read += data.read;
this.send += data.send;

return this;
};

/**
* Generate a summary of the metrics.
*
* @api public
*/
Metrics.prototype.summary = function summary() {
};

//
// Expose the metrics constructor.
//
Expand Down
10 changes: 8 additions & 2 deletions thunderbolt.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ process.on('message', function message(task) {
socket.on('open', function open() {
process.send({ type: 'open', duration: Date.now() - now, id: task.id });
write(socket, task);

// As the `close` event is fired after the internal `_socket` is cleaned up
// we need to do some hacky shit in order to tack the bytes send.
});

socket.on('message', function message(data) {
process.send({
type: 'message', latency: Date.now() - socket.last,
length: Buffer.byteLength(data || ''),
id: task.id
});

Expand All @@ -47,7 +49,11 @@ process.on('message', function message(task) {
});

socket.on('close', function close() {
process.send({ type: 'close', id: task.id });
process.send({
type: 'close', id: task.id,
read: socket._socket.bytesRead,
send: socket._socket.bytesWritten
});
});

socket.on('error', function error(err) {
Expand Down

0 comments on commit 1517ffb

Please sign in to comment.