Skip to content

Commit

Permalink
Only requie pipe with end set to false for reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
danielstjules committed Aug 17, 2014
1 parent 860592e commit d715a58
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 59 deletions.
11 changes: 8 additions & 3 deletions bin/pjs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ if (paths.length) {
* @return {Stream} The destination stream
*/
function getDestination() {
var prev, splitStream, ignoreStream, stream;
var prev, splitStream, ignoreStream, stream, outputString, opts;
splitStream = split();

// Remove trailing line, and ignore empty lines if required
Expand All @@ -69,19 +69,24 @@ function getDestination() {

// Since we're using object mode, we have to track when to
// output a string if writing to stdout
var outputString = {
outputString = {
filter: !(program.map || program.reduce || program.json),
map: !(program.reduce || program.json),
reduce: !program.json
};

// Stream pipe options
opts = {
reduce: {end: false}
};

// Pipe to each action, when set
['filter', 'map', 'reduce'].forEach(function(action) {
if (program[action]) {
stream = pjs[action](program[action], outputString[action],
program.explicit);

prev.pipe(stream, {end: false});
prev.pipe(stream, opts[action]);
prev = stream;
}
});
Expand Down
22 changes: 1 addition & 21 deletions lib/pjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ exports.filter = function(expression, outputString, explicit) {
};

filterStream = new stream.Transform({objectMode: true});
closeOnEnd(filterStream);

filterStream._transform = function(chunk, encoding, fn) {
if (!include(chunk, i++)) return fn();
if (outputString) chunk = String(chunk) + "\n";
Expand Down Expand Up @@ -75,8 +73,6 @@ exports.map = function(expression, outputString, explicit) {
}

mapStream = new stream.Transform({objectMode: true});
closeOnEnd(mapStream);

mapStream._transform = function(chunk, encoding, fn) {
this.push(update(chunk, i++));
return fn();
Expand All @@ -89,7 +85,7 @@ exports.map = function(expression, outputString, explicit) {
* Accepts an array and an expression to evaluate. It invokes a built-in
* function if expression is one of: length, min, max, sum, avg or concat.
* Otherwise, it evaluates the passed expression, passing it as the callback
* to reduce.
* to reduce. Must be piped to with end set to false.
*
* @param {*[]} array An array to reduce
* @param {string} expression The expression to evaluate
Expand Down Expand Up @@ -141,8 +137,6 @@ exports.ignore = function(ignoreEmpty) {
var emptyFlag, ignoreStream;

ignoreStream = new stream.Transform({objectMode: true});
closeOnEnd(ignoreStream);

ignoreStream._transform = function(chunk, encoding, fn) {
if (emptyFlag) {
this.push('');
Expand Down Expand Up @@ -189,17 +183,3 @@ exports.json = function(streamArray) {

return jsonStream;
};

/**
* Given a stream that was piped to with end set to false, installs the
* necessary handlers to close the pipe on source end.
*
* @param {Stream} The stream to modify
*/
function closeOnEnd(stream) {
stream.on('pipe', function(src) {
src.on('end', function() {
stream.end();
});
});
}
4 changes: 2 additions & 2 deletions spec/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ exports.arrayStream = arrayStream = function(array) {
return readable;
};

exports.testStream = testStream = function(array, transform, fn) {
exports.testStream = testStream = function(array, transform, opts, fn) {
array = array.slice(0);
var readable = arrayStream(array);
var result = [];
Expand All @@ -34,5 +34,5 @@ exports.testStream = testStream = function(array, transform, fn) {
fn(null, result);
});

readable.pipe(transform, {end: false}).pipe(dst);
readable.pipe(transform, opts).pipe(dst);
};
55 changes: 22 additions & 33 deletions spec/pjsSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe('pjs', function() {
it('does not modify the array if the exp is always true', function(done) {
var filter = pjs.filter('true');

testStream(lines, filter, function(err, res) {
testStream(lines, filter, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(lines);
done();
Expand All @@ -20,7 +20,7 @@ describe('pjs', function() {
it('binds any string prototype keys to the line in question', function(done) {
var filter = pjs.filter('length === 3');

testStream(lines, filter, function(err, res) {
testStream(lines, filter, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['foo', 'bar']);
done();
Expand All @@ -30,7 +30,7 @@ describe('pjs', function() {
it("passes the line's index, i, to the function", function(done) {
var filter = pjs.filter('i % 2 === 0');

testStream(lines, filter, function(err, res) {
testStream(lines, filter, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['a', 'foo']);
done();
Expand All @@ -40,7 +40,7 @@ describe('pjs', function() {
it('outputs newline delimited results if outputString is true', function(done) {
var filter = pjs.filter('3 !== length', true);

testStream(lines, filter, function(err, res) {
testStream(lines, filter, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(["a\n", "b\n"]);
done();
Expand All @@ -50,7 +50,7 @@ describe('pjs', function() {
it('requires the line be referenced as "$" if explicit is true', function(done) {
var filter = pjs.filter('$.indexOf("b") !== -1', false, true);

testStream(lines, filter, function(err, res) {
testStream(lines, filter, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['b', 'bar']);
done();
Expand All @@ -62,7 +62,7 @@ describe('pjs', function() {
it('modifies the array with the given expression', function(done) {
var map = pjs.map('"i"');

testStream(lines, map, function(err, res) {
testStream(lines, map, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['i', 'i', 'i', 'i']);
done();
Expand All @@ -72,7 +72,7 @@ describe('pjs', function() {
it('binds any string prototype keys to the line in question', function(done) {
var map = pjs.map('toUpperCase()');

testStream(lines, map, function(err, res) {
testStream(lines, map, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['A', 'B', 'FOO', 'BAR']);
done();
Expand All @@ -82,7 +82,7 @@ describe('pjs', function() {
it("passes the line's index, i, to the function", function(done) {
var map = pjs.map('i');

testStream(lines, map, function(err, res) {
testStream(lines, map, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['0', '1', '2', '3']);
done();
Expand All @@ -92,7 +92,7 @@ describe('pjs', function() {
it('outputs newline delimited results if outputString is true', function(done) {
var map = pjs.map('toLowerCase()', true);

testStream(lines, map, function(err, res) {
testStream(lines, map, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(["a\n", "b\n", "foo\n", "bar\n"]);
done();
Expand All @@ -102,7 +102,7 @@ describe('pjs', function() {
it('requires the line be referenced as "$" if explicit is true', function(done) {
var map = pjs.map('$.charAt(0)', false, true);

testStream(lines, map, function(err, res) {
testStream(lines, map, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['a', 'b', 'f', 'b']);
done();
Expand All @@ -114,7 +114,7 @@ describe('pjs', function() {
it('returns the length when passed as the expression', function(done) {
var reduce = pjs.reduce('length');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(3);
done();
Expand All @@ -124,7 +124,7 @@ describe('pjs', function() {
it('returns the min when passed as the expression', function(done) {
var reduce = pjs.reduce('min');

testStream([2, 4, 8], reduce, function(err, res) {
testStream([2, 4, 8], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(2);
done();
Expand All @@ -134,7 +134,7 @@ describe('pjs', function() {
it('returns the max when passed as the expression', function(done) {
var reduce = pjs.reduce('max');

testStream([2, 4, 8], reduce, function(err, res) {
testStream([2, 4, 8], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(8);
done();
Expand All @@ -144,7 +144,7 @@ describe('pjs', function() {
it('returns the sum when passed as the expression', function(done) {
var reduce = pjs.reduce('sum');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(6);
done();
Expand All @@ -154,7 +154,7 @@ describe('pjs', function() {
it('returns the avg when passed as the expression', function(done) {
var reduce = pjs.reduce('avg');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(2);
done();
Expand All @@ -164,7 +164,7 @@ describe('pjs', function() {
it('returns the concatenated string when passed "concat"', function(done) {
var reduce = pjs.reduce('concat');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql('123');
done();
Expand All @@ -174,7 +174,7 @@ describe('pjs', function() {
it('accepts a custom expression, passing prev and curr', function(done) {
var reduce = pjs.reduce('prev + curr');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(6);
done();
Expand All @@ -184,7 +184,7 @@ describe('pjs', function() {
it('accepts a custom expression, also passing i and array', function(done) {
var reduce = pjs.reduce('3 * array[i]');

testStream([1, 2, 3], reduce, function(err, res) {
testStream([1, 2, 3], reduce, {end: false}, function(err, res) {
expect(err).to.be(null);
expect(res[0]).to.eql(9);
done();
Expand All @@ -196,7 +196,7 @@ describe('pjs', function() {
it('ignores the last empty line resulting from split', function(done) {
var ignore = pjs.ignore();

testStream(['a', '', 'c', ''], ignore, function(err, res) {
testStream(['a', '', 'c', ''], ignore, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['a', '', 'c']);
done();
Expand All @@ -206,7 +206,7 @@ describe('pjs', function() {
it('ignores all empty lines when ignoreEmpty is true', function(done) {
var ignore = pjs.ignore(true);

testStream(['a', '', 'c', ''], ignore, function(err, res) {
testStream(['a', '', 'c', ''], ignore, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['a', 'c']);
done();
Expand All @@ -215,19 +215,10 @@ describe('pjs', function() {
});

describe('json', function() {
var closeonEnd = function(stream) {
stream.on('pipe', function(src) {
src.on('end', function() {
stream.end();
});
});
};

it('streams a single object when streamArray is false', function(done) {
var json = pjs.json();
closeonEnd(json);

testStream([{test: 'value'}], json, function(err, res) {
testStream([{test: 'value'}], json, {}, function(err, res) {
expect(err).to.be(null);
expect(res).to.eql(['{"test":"value"}']);
done();
Expand All @@ -237,9 +228,8 @@ describe('pjs', function() {
it('streams a string json array when streamArray is true', function(done) {
var array = [{test: 'object1'}, {test: 'object2'}];
var json = pjs.json(true);
closeonEnd(json);

testStream(array, json, function(err, res) {
testStream(array, json, {}, function(err, res) {
res = res.map(function(buffer) {
return buffer.toString();
});
Expand All @@ -252,7 +242,6 @@ describe('pjs', function() {
'{"test":"object2"}',
'\n]\n'
]);

done();
});
});
Expand Down

0 comments on commit d715a58

Please sign in to comment.