diff --git a/thunderbolt.js b/thunderbolt.js index 02dc2c9..7271f6d 100644 --- a/thunderbolt.js +++ b/thunderbolt.js @@ -15,13 +15,7 @@ process.on('message', function message(task) { // Write a new message to the socket. The message should have a size of x // if ('write' in task) collection.forEach(function write(socket) { - var start = socket.last = now; - - session[task.method || 'utfh8'](task.size, function message(err, data) { - socket.send(data, function sending(err) { - if (err) process.send({ type: 'error', message: err.message }); - }); - }); + write(socket, task); }); // @@ -38,8 +32,7 @@ process.on('message', function message(task) { socket.on('open', function open() { process.send({ type: 'open', duration: Date.now() - now, id: task.id }); - - write(); + write(socket, task); }); socket.on('message', function message(data) { @@ -48,6 +41,9 @@ process.on('message', function message(task) { length: Buffer.byteLength(data || ''), id: task.id }); + + // Only write as long as we are allowed to send messages + if (--task.messages) write(socket, task); }); socket.on('close', function close() { @@ -58,24 +54,25 @@ process.on('message', function message(task) { process.send({ type: 'error', message: err.message, id: task.id }); }); - - /** - * Helper function from writing messages to the socket. - * - * @api private - */ - function write() { - var start = socket.last = Date.now(); - - session[task.method || 'utf8'](task.size, function message(err, data) { - socket.send(data, function sending(err) { - if (err) process.send({ type: 'error', message: err.message }); - - if (--task.messages) setTimeout(write, task.timeout || 100); - }); - }); - } - // Adding a new socket to our socket collection. collection.push(socket); }); + +/** + * Helper function from writing messages to the socket. + * + * @param {WebSocket} socket WebSocket connection we should write to + * @param {Object} task The given task + * @param {Function} fn The callback + * @api private + */ +function write(socket, task, fn) { + var start = socket.last = Date.now(); + + session[task.method || 'utf8'](task.size, function message(err, data) { + socket.send(data, function sending(err) { + if (err) process.send({ type: 'error', message: err.message }); + if (fn) fn(err); + }); + }); +}