Skip to content

Commit

Permalink
Merge pull request #20 from smaxwellstewart/dev
Browse files Browse the repository at this point in the history
v2.3
  • Loading branch information
kidtronnix committed Jun 12, 2015
2 parents b0efdbc + f4431bd commit fcdd945
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 103 deletions.
35 changes: 23 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mongo-lambda

A [lambda architecture](http://www.manning.com/marz/) implementation for mongodb with simple API for providing mongo's aggregation pipepline reports. Written in javascript designed as an npm module.

version: ***2.1.x***
version: ***2.2.x***

### Data Model

Expand All @@ -17,10 +17,9 @@ The data model is based on an [stream processing / event sourcing](http://blog.c
var ML = require('mongo-lambda');

var lambda = new ML.Lambda({
host: 'localhost',
port: 27017,
db: 'lambda-db',
masterColl: "hits"
url: 'mongodb://localhost:27017/hits',
masterColl: "hits",
ttl: 60*60 // 1 hour
});

lambda.reports([{
Expand All @@ -35,7 +34,6 @@ lambda.start(function() {
setInterval(function() {
// Drip data
lambda.insert({ua: "iphone"}, function(err, results) {

// Get batches and live data
Async.parallel({
batches: Async.apply(lambda.batches, 'report4'),
Expand Down Expand Up @@ -71,21 +69,29 @@ lambda.start(function() {

### API

#### `.Lambda(config)`

Returns `lambda` instance. Object that has the methods listed underneath. It is configured with the following object:

- `url`: Mongodb url connection string.
- `masterColl`: Name of master collection.
- `ttl`: *OPTIONAL* The time-to-live for your master collection. Data in speed collections will also expire after this time. If field is absent data will never expire.

#### `.reports(reports)`

Will insert array of reports into system and start new cron job to run using their supplied `agg`. A report has the following structure:

- name: Name of report, used to refer to later.
- agg: [Mongo aggregation pipeline](http://docs.mongodb.org/manual/core/aggregation-pipeline/) array.
- cron: Cron string that defines schedule of when aggregations are run. See [here](https://www.npmjs.com/package/cron) for allowed cron strings.
- timezone: The timezone of the cron job.
- startCron: Whether to start the cron (defaults to true), useful if you want to have separate instances for inserting and getting data.
- `name`: Name of report, used to refer to later.
- `agg`: [Mongo aggregation pipeline](http://docs.mongodb.org/manual/core/aggregation-pipeline/) array.
- `cron`: Cron string that defines schedule of when aggregations are run. See [here](https://www.npmjs.com/package/cron) for allowed cron strings.
- `timezone`: The timezone of the cron job.
- `startCron`: *OPTIONAL* Whether to start the cron (defaults to true), useful if you want to have separate instances for inserting and getting data.
- `lateWindow`: *OPTIONAL* Defines the how late data can arrive before batch is computed, *example* wait 5 mins before the last hour's report is calculated.

#### `.start(callback)`

Starts Lambda instance, initialises cron jobs and mongodb. *NOTE!* This function must be called before you can insert or get data, ie before you can call any of the methods below.


#### `.insert(data, callback)`

Will insert data into batch and speed layer's mongo collection. Accepts data object or array of objects, just like mongodb's native insert. All data points are timestamped with `_ts` field unless already timestamped.
Expand All @@ -98,3 +104,8 @@ Get's batches of data produced by cron job. Callback has following signature: `f

Get's a speed aggregation on speed collection, ie data that has not yet been batched. Callback has following signature: `function(err, speedAgg)`.

#### `.reprocess(report.name, dates, callback)`

Will re-run batch reports according to specified dates. Batches will start from earliest specified date in incrementing chunks until the lastest specified date.


4 changes: 1 addition & 3 deletions examples/basic.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
var ML = require('..');

var lambda = new ML.Lambda({
host: 'localhost',
port: 27017,
db: 'docs',
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "searches"
});

Expand Down
8 changes: 3 additions & 5 deletions examples/fapsearches.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ var Wreck = require('wreck');
var ML = require('..');

var lambda = new ML.Lambda({
host: 'localhost',
port: 27017,
db: 'faps',
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "searches"
});

Expand All @@ -27,7 +25,7 @@ lambda.reports([{
$sort : { hits : -1}
}
],
cron: "0 * * * *",
cron: "*/1 * * * *",
timezone: "EST"
}]);

Expand Down Expand Up @@ -57,4 +55,4 @@ var batchAgg = [
{ $unwind: '$data' },
{ $match : { "data._id" : /god/ } },
{ $project: { _id: "$data._id", hits: "$data.hits", s: "$data.s", t: "$data.t", g: "$data.g"}}
];
];
30 changes: 0 additions & 30 deletions examples/imps.js

This file was deleted.

28 changes: 28 additions & 0 deletions examples/reprocess.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
var Wreck = require('wreck');
var ML = require('..');

var lambda = new ML.Lambda({
url: 'mongodb://localhost:27017/mongo-lambda-test',
masterColl: "searches"
});

lambda.reports([{
name: "docCount",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
cron: "*/1 * * * *",
timezone: "EST",
startCron: false
}]);

var start = new Date();
var dates = [start];
for(var i = 1; i < 10; i++) {
dates.push(new Date(start.getTime() - i * 1000));
}

lambda.start(function() {
lambda.reprocess('docCount', dates, function() {
console.log('done reprocessing!');
process.exit(0);
});
});
50 changes: 26 additions & 24 deletions lib/batchLayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,46 +69,50 @@ exports.insertReports = function(reports, next) {
})
}

internals.runAgg = function(report, next) {

// Ammend agg
internals.getFromTo = function(report, next) {
var from = internals.reports[report.name].lastReportTo;
report.agg.unshift({ $match: {'_ts': {$gte: from }}});
module.exports.reports[report.name].lastReportTo = internals.reports[report.name].lastReportTo = new Date();

var to = module.exports.reports[report.name].lastReportTo = internals.reports[report.name].lastReportTo = new Date();

next(null, from, to, report);
}

exports.runAgg = internals.runAgg = function(from, to, report, next) {
var agg = report.agg.slice();
agg.unshift({ $match: {'_ts': {$gt: from, $lte: to }}});
setTimeout(function(){
mongo.master.aggregate(report.agg, function(err, data) {
mongo.master.aggregate(agg, function(err, data) {
if (err) {
console.warn("AGGREGATION ERROR: "+err.message);
next(err);
}

var batch = {
report: report.name,
from: from,
to: internals.reports[report.name].lastReportTo,
to: to,
data: data
}
next(err, batch);
next(err, batch, report);
});
}, report.lateWindow);
}

exports.runReport = function(report, next) {

Async.series({
agg: Async.apply(internals.runAgg, report)
}, function (err, result) {
var batch = {
report: report.name,
from: result.agg.from,
to: result.agg.to,
data: result.agg.data
}
exports.insertBatch = internals.insertBatch = function(batch, report, next) {
mongo.batches[report.name].insert(batch, {w:1}, function(err, doc) {
if (err) {
console.warn(err.message);
console.warn("Inserting error: "+err.message);
}
next(err, doc[0]);
});
}

exports.runReport = function(report, next) {

Async.waterfall([
Async.apply(internals.getFromTo, report),
internals.runAgg,
internals.insertBatch
], function (err) {
next(err);
});
}

Expand All @@ -125,12 +129,10 @@ exports.getBatches = function(name, query, next) {
// if(query.to) {
// q.to = { $lte: query.to }
// }

mongo.batches[name].find(q).sort({'to': 1}).toArray(function(err, batches) {
if (err) {
console.warn("ERROR LOOKING UP BATCHES: "+err.message);
}

next(err, batches);
});
}
29 changes: 28 additions & 1 deletion lib/lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,38 @@ internals.Lambda.prototype.speedAgg = function(name, callback) {
}


internals.Lambda.prototype.reprocess = function(name, dates, callback) {
// TO DO: Validate dates
// TO DO : Validate report exists
dates = dates.sort(function(a,b) {
return a - b;
});

Async.each(dates, function(date, next) {
var date_index = dates.indexOf(date);

if(date_index +1< dates.length) {
var from = date;
var to = dates[date_index+1];
var report = Batch.reports[name];
Async.waterfall([
Async.apply(Batch.runAgg, from, to, report),
Batch.insertBatch
], function(err) {
next(err);
});
} else {
next();
}
}, function(err) {
callback(err);
});
}

internals.Lambda.prototype.start = function(callback) {
Async.series({
initBatch: Async.apply(Batch.init, internals.options),
initDb: Async.apply(mongo.init, internals.options),
initBatch: Async.apply(Batch.init, internals.options),
initJR: Async.apply(JobRunner.init, Batch._reports)
}, function(err, results) {
if (err) {
Expand Down
2 changes: 0 additions & 2 deletions lib/speedLayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ exports.scrubData = function(report, next) {
var query = {'_ts': {
$lt: Batch.reports[report.name].lastReportTo
} };

mongo.speed[report.name].remove(query, {w:1}, function(err, numDocs) {
if (err) {
console.warn("ERROR SCRUBBING SPEED: ", err);
Expand Down Expand Up @@ -52,7 +51,6 @@ exports.getOnTheFly = function(name, query, next) {
if (err) {
console.warn("AGGREGATION ERROR: "+err.message);
}

next(err, data);
});
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mongo-lambda",
"version": "2.1.0",
"version": "2.3.0",
"description": "A lambda architecture implementation with simple API for providing mongo's aggregation pipepline reports.",
"main": "index.js",
"scripts": {
Expand Down
7 changes: 7 additions & 0 deletions test/reports.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ var testReports = {
cron: "*/10 * * * * *",
timezone: "EST"
},
reprocessTest: {
name: "reprocessTest",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
cron: "*/10 * * * * *",
timezone: "EST",
startCron: false
},
totalTest: {
name: "report6",
agg: [{ $group: {_id: '$ua', count: { $sum: 1 }}}],
Expand Down
Loading

0 comments on commit fcdd945

Please sign in to comment.