Skip to content

Commit

Permalink
v2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
kidtronnix committed Jun 12, 2015
1 parent e8c93a0 commit f4431bd
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 25 deletions.
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ The data model is based on an [stream processing / event sourcing](http://blog.c
var ML = require('mongo-lambda');

var lambda = new ML.Lambda({
host: 'localhost',
port: 27017,
db: 'lambda-db',
masterColl: "hits"
url: 'mongodb://localhost:27017/hits',
masterColl: "hits",
ttl: 60*60 // 1 hour
});

lambda.reports([{
Expand All @@ -35,7 +34,6 @@ lambda.start(function() {
setInterval(function() {
// Drip data
lambda.insert({ua: "iphone"}, function(err, results) {

// Get batches and live data
Async.parallel({
batches: Async.apply(lambda.batches, 'report4'),
Expand Down Expand Up @@ -71,21 +69,29 @@ lambda.start(function() {

### API

#### `.Lambda(config)`

Returns `lambda` instance. Object that has the methods listed underneath. It is configured with the following object:

- `url`: Mongodb url connection string.
- `masterColl`: Name of master collection.
- `ttl`: *OPTIONAL* The time-to-live for your master collection. Data in speed collections will also expire after this time. If field is absent data will never expire.

#### `.reports(reports)`

Will insert array of reports into system and start new cron job to run using their supplied `agg`. A report has the following structure:

- name: Name of report, used to refer to later.
- agg: [Mongo aggregation pipeline](http://docs.mongodb.org/manual/core/aggregation-pipeline/) array.
- cron: Cron string that defines schedule of when aggregations are run. See [here](https://www.npmjs.com/package/cron) for allowed cron strings.
- timezone: The timezone of the cron job.
- startCron: Whether to start the cron (defaults to true), useful if you want to have separate instances for inserting and getting data.
- `name`: Name of report, used to refer to later.
- `agg`: [Mongo aggregation pipeline](http://docs.mongodb.org/manual/core/aggregation-pipeline/) array.
- `cron`: Cron string that defines schedule of when aggregations are run. See [here](https://www.npmjs.com/package/cron) for allowed cron strings.
- `timezone`: The timezone of the cron job.
- `startCron`: *OPTIONAL* Whether to start the cron (defaults to true), useful if you want to have separate instances for inserting and getting data.
- `lateWindow`: *OPTIONAL* Defines the how late data can arrive before batch is computed, *example* wait 5 mins before the last hour's report is calculated.

#### `.start(callback)`

Starts Lambda instance, initialises cron jobs and mongodb. *NOTE!* This function must be called before you can insert or get data, ie before you can call any of the methods below.


#### `.insert(data, callback)`

Will insert data into batch and speed layer's mongo collection. Accepts data object or array of objects, just like mongodb's native insert. All data points are timestamped with `_ts` field unless already timestamped.
Expand All @@ -98,3 +104,8 @@ Get's batches of data produced by cron job. Callback has following signature: `f

Get's a speed aggregation on speed collection, ie data that has not yet been batched. Callback has following signature: `function(err, speedAgg)`.

#### `.reprocess(report.name, dates, callback)`

Will re-run batch reports according to specified dates. Batches will start from earliest specified date in incrementing chunks until the lastest specified date.


28 changes: 28 additions & 0 deletions examples/reprocess.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
var Wreck = require('wreck');
var ML = require('..');

var lambda = new ML.Lambda({
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "searches"
});

lambda.reports([{
name: "docCount",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
cron: "*/1 * * * *",
timezone: "EST",
startCron: false
}]);

var start = new Date();
var dates = [start];
for(var i = 1; i < 10; i++) {
dates.push(new Date(start.getTime() - i * 1000));
}

lambda.start(function() {
lambda.reprocess('docCount', dates, function() {
console.log('done reprocessing!');
process.exit(0);
});
});
6 changes: 2 additions & 4 deletions lib/batchLayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,18 @@ exports.runAgg = internals.runAgg = function(from, to, report, next) {
console.warn("AGGREGATION ERROR: "+err.message);
next(err);
}

var batch = {
report: report.name,
from: from,
to: internals.reports[report.name].lastReportTo,
to: to,
data: data
}

next(err, batch, report);
});
}, report.lateWindow);
}

internals.insertBatch = function(batch, report, next) {
exports.insertBatch = internals.insertBatch = function(batch, report, next) {
mongo.batches[report.name].insert(batch, {w:1}, function(err, doc) {
if (err) {
console.warn("Inserting error: "+err.message);
Expand Down
13 changes: 9 additions & 4 deletions lib/lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,28 @@ internals.Lambda.prototype.speedAgg = function(name, callback) {
internals.Lambda.prototype.reprocess = function(name, dates, callback) {
// TO DO: Validate dates
// TO DO : Validate report exists
dates = dates.sort(function(a,b) {
return a - b;
});

Async.each(dates, function(date, next) {
var date_index = dates.indexOf(date);

if(date_index + 1 < dates.length) {
if(date_index +1< dates.length) {
var from = date;
var to = dates[date_index+1];
var report = Batch.reports[name];

Async.waterfall([
Async.apply(Batch.runAgg, from, to, report),
Batch.insertBatch
], function(err) {
next(err);
});
} else {
next();
}
}, function(err){
callback(err);
}, function(err) {
callback(err);
});
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mongo-lambda",
"version": "2.2.0",
"version": "2.3.0",
"description": "A lambda architecture implementation with simple API for providing mongo's aggregation pipepline reports.",
"main": "index.js",
"scripts": {
Expand Down
7 changes: 7 additions & 0 deletions test/reports.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ var testReports = {
cron: "*/10 * * * * *",
timezone: "EST"
},
reprocessTest: {
name: "reprocessTest",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
cron: "*/10 * * * * *",
timezone: "EST",
startCron: false
},
totalTest: {
name: "report6",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
Expand Down
42 changes: 37 additions & 5 deletions test/tests/lamda.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ describe('Mongo Lambda', function () {
});
});

it('data expires when ttl is set', { timeout: 60*1000 }, function(done) {
it('correctly adds ttl index', { timeout: 60*1000 }, function(done) {
var conf = {
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "masterTTL",
Expand All @@ -212,10 +212,6 @@ describe('Mongo Lambda', function () {

var lambda = new ML.Lambda(conf);

var checkResults = function(next) {
setTimeout(function(){
}, 20*1000);
};
lambda.reports([testReports.ttlTest]);

Async.series([
Expand All @@ -238,6 +234,42 @@ describe('Mongo Lambda', function () {

});

it('can reprocess data', { timeout: 60*1000 }, function(done) {
var conf = {
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "master"
};

var lambda = new ML.Lambda(conf);

lambda.reports([testReports.reprocessTest]);

var start = new Date();
var dates = [start];
for(var i = 1; i < 10; i++) {
dates.push(new Date(start.getTime() - i * 1000));
};

Async.series([
Async.apply(internals.startLambda, lambda),
Async.apply(lambda.reprocess, testReports.reprocessTest.name, dates)
],
function(err, results) {

expect(err).to.not.exist();
var find = {
from: {$gte: dates[0]},
to: {$lte: dates[dates.length -1]}
}
internals.db.collection(testReports.reprocessTest.name+"_batches").find(find).toArray(function(err, docs) {
expect(err).to.not.exist();
expect(docs.length).to.equal(dates.length-1);
done();
});

});

});
it('keeps correct total', { timeout: 60*1000 +1000}, function (done) {
var lambda = new ML.Lambda(config);
lambda.reports([testReports.totalTest]);
Expand Down

0 comments on commit f4431bd

Please sign in to comment.