Skip to content
This repository has been archived by the owner on Nov 6, 2019. It is now read-only.

Commit

Permalink
Update style and naming
Browse files Browse the repository at this point in the history
  • Loading branch information
scttnlsn committed Mar 23, 2014
1 parent c2df02f commit 854bbcd
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 248 deletions.
40 changes: 21 additions & 19 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ module.exports = Job;

function Job(collection, data) {
this.collection = collection;
if(data){
data.__proto__ = JobData.prototype; //Convert plain object to JobData type

if (data) {
// Convert plain object to JobData type
data.__proto__ = JobData.prototype;
this.data = data;
} else {
this.data = new JobData();
Expand All @@ -21,14 +23,16 @@ Job.prototype.save = function(callback) {
this.collection.save(this.data, function(err, doc) {
if (err) return callback(err);

if (doc && self.data._id === undefined) self.data._id = doc._id;
if (doc && self.data._id === undefined) {
self.data._id = doc._id;
}

callback(null, self);
});
};

Job.prototype.cancel = function(callback){
if(this.data.status === 'queued'){
Job.prototype.cancel = function(callback) {
if (this.data.status === 'queued') {
this.data.status = 'cancelled';
this.data.ended = new Date();
this.save(callback);
Expand All @@ -44,23 +48,24 @@ Job.prototype.complete = function(result, callback) {
this.save(callback);
};

Job.prototype.fail = function(error, callback) {
Job.prototype.fail = function(err, callback) {
var attempts = this.data.attempts;

if(attempts){
attempts.attemptsLeft = (attempts.attemptsLeft || attempts.count) - 1;
if (attempts) {
attempts.remaining = (attempts.remaining || attempts.count) - 1;

if(attempts.attemptsLeft > 0){
if (attempts.remaining > 0) {
this.data.status = 'queued';

var offset = 0;

if(attempts.delayMS != null){
switch(attempts.delayStrategy){
if (attempts.delay !== undefined) {
switch (attempts.strategy) {
case 'exponential':
offset = attempts.delayMS * (attempts.count - attempts.attemptsLeft);
offset = attempts.delay * (attempts.count - attempts.remaining);
break;
default:
offset = attempts.delayMS;
offset = attempts.delay;
break;
}
}
Expand All @@ -74,19 +79,16 @@ Job.prototype.fail = function(error, callback) {
}

this.data.ended = new Date();
this.data.error = error && error.message || error;

if(error && error.stack){
this.data.stack = error.stack;
}
this.data.error = err.message;
this.data.stack = err.stack;

this.save(callback);
};

function JobData() {}

Object.defineProperty(JobData.prototype, 'id', {
get: function(){
get: function() {
return this._id && this._id.toString && this._id.toString();
}
});
125 changes: 56 additions & 69 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function Queue(connection, name, options) {

this.collection = connection.db.collection(this.options.collection);

if(options.index !== false){
if (options.index !== false) {
this.ensureIndex();
}
}
Expand All @@ -27,88 +27,89 @@ Queue.prototype.job = function(data) {
return new Job(this.collection, data);
};

Queue.prototype.fetchJob = function(id, callback){
if(typeof id === 'string'){
Queue.prototype.fetchJob = function(id, callback) {
if (typeof id === 'string') {
id = new mongoskin.BSONPure.ObjectID(id);
}

var self = this;
this.collection.findOne({ _id: id }, function(err, data){
this.collection.findOne({ _id: id }, function(err, data) {
callback(err, new Job(self.collection, data));
});
};

Queue.prototype.enqueue = function(name, params, options, callback) {
if(!callback && typeof options === 'function') {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}

var jobInfo = {
var info = {
name: name,
params: params,
queue: this.name,
status: 'queued',
enqueued: new Date(),
delay: options.delay || new Date(),
priority: options.priority || 0 //priority must be defaulted to zero for sorting of negative priorities to work

// priority must be defaulted to zero for sorting of negative priorities to work
priority: options.priority || 0
};

var attempts = options.attempts;
if(attempts){
if(typeof attempts !== 'object'){
throw new Error('attempts must be an object');
}

if(attempts.delayMS != null && attempts.delayFunction != null){
throw new Error('Only one of delayFunction and delayMS may be specified');
if (attempts) {
if (typeof attempts !== 'object') {
throw new Error('attempts must be an object');
}

jobInfo.attempts = {};

jobInfo.attempts.count = parseInt(attempts.count, 10);
info.attempts = {};
info.attempts.count = parseInt(attempts.count, 10);

if(attempts.delayMS != null){
jobInfo.attempts.delayMS = parseInt(attempts.delayMS, 10);
if (attempts.delay !== undefined) {
info.attempts.delay = parseInt(attempts.delay, 10);
}

if(attempts.delayStrategy != null){
if(attempts.delayStrategy !== 'exponential'){
throw new Error('Unknown strategy: ' + attempts.delayStrategy);
if (attempts.strategy !== undefined) {
if (attempts.strategy !== 'exponential') {
throw new Error('Unknown strategy: ' + attempts.strategy);
}

if(attempts.delayMS == null){
throw new Error('delayMS must be specified along with delayStrategy');
if (attempts.delay === undefined) {
throw new Error('delay must be specified along with strategy');
}

jobInfo.attempts.delayStrategy = attempts.delayStrategy;
info.attempts.strategy = attempts.strategy;
}
}

if(options.timeoutMS){
jobInfo.timeoutMS = parseInt(options.timeoutMS, 10);
if (options.timeout) {
info.timeout = parseInt(options.timeout, 10);
}

var job = this.job(jobInfo);
var job = this.job(info);

job.save(callback);
};

Queue.prototype.dequeue = function(options, callback) {
var self = this;

if(callback == null){
if (callback === undefined) {
callback = options;
options = {};
}

var query = { status: 'queued', queue: this.name, delay: {$lte: new Date()}};
var query = {
status: 'queued',
queue: this.name,
delay: { $lte: new Date() }
};

if(options.minPriority != null){
if (options.minPriority !== undefined) {
query.priority = { $gte: options.minPriority };
}

//Sorting by id instead of enqueued because jobs inserted within the same millisecond would run in a random order
var sort = [['priority', 'desc'], ['_id', 'asc']];
var update = { '$set': { status: 'dequeued', dequeued: new Date() }};
var options = { new: true };
Expand All @@ -117,12 +118,16 @@ Queue.prototype.dequeue = function(options, callback) {
if (err) return callback(err);
if (!doc) return callback();

if(doc.timeoutMS){
doc.timeoutAt = new Date(new Date() + doc.timeoutMS);
if (doc.timeout) {
doc.expiration = new Date(new Date() + doc.timeout);

self.collection.updateById(doc._id, { $set: { timeoutAt: doc.timeoutAt } }, function(err){
if(err){
console.error("An error occurred setting a job's timeoutAt value:");
var query = {
$set: { expiration: doc.expiration }
};

self.collection.updateById(doc._id, query, function(err) {
if (err) {
console.error("An error occurred setting a job's expiration value:");
console.error(err);
}
});
Expand All @@ -132,52 +137,34 @@ Queue.prototype.dequeue = function(options, callback) {
});
};

Queue.prototype.getTimedOutJob = function(gracePeriod, callback){
var self = this;

var query = {
status: 'dequeued',
queue: this.name,

//Give the worker ample time to process the failure itself first to avoid a race condition
timeoutAt: { $lt: new Date(new Date().getTime() + gracePeriod) }
};
var sort = { timeoutAt: 1 };
var options = { new: true };

//We only set the status to timeout temporarily to prevent the race condition of multiple processes timing out the same job.
//The job will either be failed or retried.
var update = { $set: { status: 'timedout' } };
Queue.prototype.ensureIndex = function() {
var collection = this.collection;

this.collection.findAndModify(query, sort, update, options, function(err, doc){
callback(err, doc ? self.job(doc) : doc);
});
};
// Drop old indexes

Queue.prototype.ensureIndex = function(){
// Drop old index
var collection = this.collection;
this.collection.indexExists('status_1_queue_1_enqueued_1', function(err, indexes) {
if(err) console.error(err);
if(indexes === true) {
if (err) console.error(err);

if (indexes === true) {
collection.dropIndex('status_1_queue_1_enqueued_1', function (err, result) {
if(err) console.error(err);
if (err) console.error(err);
});
}
});

this.collection.indexExists('status_1_queue_1_enqueued_1_delay_1', function(err, indexes) {
if(err) console.error(err);
if(indexes === true) {
if (err) console.error(err);

if (indexes === true) {
collection.dropIndex('status_1_queue_1_enqueued_1_delay_1', function (err, result) {
if(err) console.error(err);
if (err) console.error(err);
});
}
});

//Ensures there's a reasonable index for the poling dequeue
//Status is first b/c querying by status = queued should be very selective
this.collection.ensureIndex({ status: 1, queue: 1, priority: 1, _id: 1, delay: 1 }, function(err){
if(err) console.error(err);
// Ensures there's a reasonable index for the poling dequeue
// Status is first b/c querying by status = queued should be very selective
this.collection.ensureIndex({ status: 1, queue: 1, priority: 1, _id: 1, delay: 1 }, function(err) {
if (err) console.error(err);
});
};
44 changes: 24 additions & 20 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ function Worker(queues, options) {
this.queues = queues;
this.callbacks = options.callbacks || {};
this.interval = options.interval || 5000;
this.minPriority = options.minPriority; //This worker will only process jobs of this priority or higher

// This worker will only process jobs of this priority or higher
this.minPriority = options.minPriority;

if (this.queues.length === 0) {
throw new Error('Worker must have at least one queue.');
Expand All @@ -38,23 +40,25 @@ Worker.prototype.stop = function(callback) {
var wasWorking = this.working;
this.working = false;

//If a callback is passed, call it when the worker has finished its current job
if(callback){
if(wasWorking) {
// If a callback is passed, call it when the worker has finished its current job
if (callback) {
if (wasWorking) {
var doneListener = function() {
self.removeListener('poll-not-working', notWorkingListener); //Remove other listener, as only one or the other will be called
// Remove other listener, as only one or the other will be called
self.removeListener('poll-not-working', notWorkingListener);
callback();
};

var notWorkingListener = function() {
self.removeListener('done', doneListener); //Remove other listener, as only one or the other will be called
// Remove other listener, as only one or the other will be called
self.removeListener('done', doneListener);
callback();
};

//If a job is in progress (or is getting dequeued) when the queue is stopped, wait for it to finish
// If a job is in progress (or is getting dequeued) when the queue is stopped, wait for it to finish
this.once('done', doneListener);

//When stopping this worker, it may currently be polling and not find any jobs to process
// When stopping this worker, it may currently be polling and not find any jobs to process
this.once('poll-not-working', notWorkingListener);
} else {
callback();
Expand Down Expand Up @@ -104,24 +108,24 @@ Worker.prototype.dequeue = function(callback) {

Worker.prototype.work = function(job) {
var self = this;
var finishCalled = false;
var finished = false;

if(job.data.timeoutMS){
var timeoutTimer = setTimeout(function(){
finishedHandler('Timed out');
}, job.data.timeoutMS);
if (job.data.timeout) {
var timer = setTimeout(function() {
done(new Error('Timed out'));
}, job.data.timeout);
}

var finishedHandler = function(err, result) {
//It's possible that this could be called twice in the case that a job times out, but the handler ends up
//finishing later on
if(finishCalled){
function done(err, result) {
// It's possible that this could be called twice in the case that a job times out,
// but the handler ends up finishing later on
if (finished) {
return;
} else {
finishCalled = true;
finished = true;
}

clearTimeout(timeoutTimer);
clearTimeout(timer);
self.emit('done', job.data);

if (err) {
Expand All @@ -141,7 +145,7 @@ Worker.prototype.work = function(job) {
}
};

this.process(job.data, finishedHandler);
this.process(job.data, done);
};

Worker.prototype.process = function(data, callback) {
Expand Down
Loading

0 comments on commit 854bbcd

Please sign in to comment.