Skip to content
This repository has been archived by the owner on Nov 6, 2019. It is now read-only.

Commit

Permalink
Merge pull request #60 from Santinell/features
Browse files Browse the repository at this point in the history
new features: addQueue and universal worker
  • Loading branch information
scttnlsn committed Dec 4, 2015
2 parents cf0ea80 + 311e193 commit 1d36bc1
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 20 deletions.
23 changes: 14 additions & 9 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ function Connection(uri, options) {
Connection.prototype.worker = function (queues, options) {
var self = this;

if (!Array.isArray(queues)) {
queues = [queues];
}

var queues = queues.map(function (queue) {
if (typeof queue === 'string') {
queue = self.queue(queue);
if (queues === "*") {
var opts = {universal: true, collection: options.collection || 'jobs' };
options.universal = true;
queues = [new Queue('*', opts)];
} else {
if (!Array.isArray(queues)) {
queues = [queues];
}

return queue;
});
var queues = queues.map(function (queue) {
if (typeof queue === 'string') {
queue = self.queue(queue);
}

return queue;
});
}
return new Worker(queues, options);
};

Expand Down
15 changes: 12 additions & 3 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ function Queue(connection, name, options) {

options || (options = {});
options.collection || (options.collection = 'jobs');
options.universal || (options.universal = false);

this.connection = connection;
this.name = name || 'default';
Expand All @@ -35,7 +36,12 @@ Queue.prototype.get = function (id, callback) {
id = new mongoskin.helper.toObjectID(id);
}

this.collection.findOne({ _id: id, queue: this.name }, function (err, data) {
var query = { _id: id };
if (!this.options.universal) {
query.queue = this.name;
}

this.collection.findOne(query, function (err, data) {
if (err) return callback(err);

var job = new Job(self.collection, data);
Expand Down Expand Up @@ -72,14 +78,17 @@ Queue.prototype.dequeue = function (options, callback) {

var query = {
status: Job.QUEUED,
queue: this.name,
delay: { $lte: new Date() }
};

if (!this.options.universal) {
query.queue = this.name;
}

if (options.minPriority !== undefined) {
query.priority = { $gte: options.minPriority };
}

if (options.callbacks !== undefined) {
var callback_names = Object.keys(options.callbacks);
query.name = { $in: callback_names };
Expand Down
15 changes: 10 additions & 5 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ module.exports = Worker;

function Worker(queues, options) {
options || (options = {});
options.queues || (options.queues = []);

this.empty = 0;
this.queues = queues;
this.queues = queues || [];
this.interval = options.interval || 5000;

this.callbacks = options.callbacks || {};
this.strategies = options.strategies || {};
this.universal = options.universal || false;

// Default retry strategies
this.strategies.linear || (this.strategies.linear = linear);
Expand All @@ -22,9 +22,6 @@ function Worker(queues, options) {
// This worker will only process jobs of this priority or higher
this.minPriority = options.minPriority;

if (this.queues.length === 0) {
throw new Error('Worker must have at least one queue.');
}
}

util.inherits(Worker, events.EventEmitter);
Expand All @@ -42,6 +39,9 @@ Worker.prototype.strategies = function (strategies) {
};

Worker.prototype.start = function () {
if (this.queues.length === 0) {
return setTimeout(this.start.bind(this), this.interval);
}
this.working = true;
this.poll();
};
Expand All @@ -65,6 +65,11 @@ Worker.prototype.stop = function (callback) {
this.once('stopped', done);
};

Worker.prototype.addQueue = function (queue) {
if (!this.universal)
this.queues.push(queue);
};

Worker.prototype.poll = function () {
if (!this.working) {
return this.emit('stopped');
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"mongoskin": "^2.0.0"
},
"devDependencies": {
"async": "0.2.10",
"mocha": "1.18.2",
"sinon": "1.9.0"
"async": "^1.5.0",
"mocha": "^2.3.4",
"sinon": "^1.17.2"
}
}

0 comments on commit 1d36bc1

Please sign in to comment.