Skip to content

Commit

Permalink
mhart#18 adds support for IncreaseStreamRetentionPeriod DecreaseStrea…
Browse files Browse the repository at this point in the history
…mRetentionPeriod

GetRecords honors RetentionPeriodHours when pruning
  • Loading branch information
lauzierj authored and mhart committed Jan 5, 2016
1 parent 9e05f8e commit 3e38b50
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 2 deletions.
42 changes: 42 additions & 0 deletions actions/decreaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var db = require('../db')

module.exports = function decreaseStreamRetentionPeriod(store, data, cb) {

var metaDb = store.metaDb

metaDb.lock(data.StreamName, function(release) {
cb = release(cb)

store.getStream(data.StreamName, function(err, stream) {
if (err) return cb(err)

if(data.RetentionPeriodHours < 24) {
return cb(db.clientError('InvalidArgumentException',
'Minimum allowed retention period is 24 hours. Requested retention period (' + data.RetentionPeriodHours +
' hours) is too short.'))
}

if(data.RetentionPeriodHours > 168) {
return cb(db.clientError('InvalidArgumentException',
'Maximum allowed retention period is 168 hours. Requested retention period (' + data.RetentionPeriodHours +
' hours) is too long.'))
}

if(stream.RetentionPeriodHours < data.RetentionPeriodHours) {
return cb(db.clientError('InvalidArgumentException',
'Requested retention period (' + data.RetentionPeriodHours +
' hours) for stream ' + data.StreamName +
' can not be longer than existing retention period (' + stream.RetentionPeriodHours +
' hours). Use IncreaseRetentionPeriod API.'))
}

stream.RetentionPeriodHours = data.RetentionPeriodHours;

metaDb.put(data.StreamName, stream, function(err) {
if (err) return cb(err)

cb()
})
})
})
}
2 changes: 1 addition & 1 deletion actions/getRecords.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ module.exports = function getRecords(store, data, cb) {

cb = once(cb)

var streamDb = store.getStreamDb(streamName), cutoffTime = Date.now() - (24 * 60 * 60 * 1000),
var streamDb = store.getStreamDb(streamName), cutoffTime = Date.now() - (stream.RetentionPeriodHours * 60 * 60 * 1000),
keysToDelete = [], lastItem, opts

opts = {
Expand Down
42 changes: 42 additions & 0 deletions actions/increaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var db = require('../db')

module.exports = function increaseStreamRetentionPeriod(store, data, cb) {

var metaDb = store.metaDb

metaDb.lock(data.StreamName, function(release) {
cb = release(cb)

store.getStream(data.StreamName, function(err, stream) {
if (err) return cb(err)

if(data.RetentionPeriodHours < 24) {
return cb(db.clientError('InvalidArgumentException',
'Minimum allowed retention period is 24 hours. Requested retention period (' + data.RetentionPeriodHours +
' hours) is too short.'))
}

if(data.RetentionPeriodHours > 168) {
return cb(db.clientError('InvalidArgumentException',
'Maximum allowed retention period is 168 hours. Requested retention period (' + data.RetentionPeriodHours +
' hours) is too long.'))
}

if(stream.RetentionPeriodHours > data.RetentionPeriodHours) {
return cb(db.clientError('InvalidArgumentException',
'Requested retention period (' + data.RetentionPeriodHours +
' hours) for stream ' + data.StreamName +
' can not be shorter than existing retention period (' + stream.RetentionPeriodHours +
' hours). Use DecreaseRetentionPeriod API.'))
}

stream.RetentionPeriodHours = data.RetentionPeriodHours

metaDb.put(data.StreamName, stream, function(err) {
if (err) return cb(err)

cb()
})
})
})
}
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var MAX_REQUEST_BYTES = 7 * 1024 * 1024
var validApis = ['Kinesis_20131202'],
validOperations = ['AddTagsToStream', 'CreateStream', 'DeleteStream', 'DescribeStream', 'GetRecords',
'GetShardIterator', 'ListStreams', 'ListTagsForStream', 'MergeShards', 'PutRecord', 'PutRecords',
'RemoveTagsFromStream', 'SplitShard'],
'RemoveTagsFromStream', 'SplitShard', 'IncreaseStreamRetentionPeriod', 'DecreaseStreamRetentionPeriod'],
actions = {},
actionValidations = {}

Expand Down
110 changes: 110 additions & 0 deletions test/decreaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
var helpers = require('./helpers')

require('should');

var target = 'DecreaseStreamRetentionPeriod',
request = helpers.request,
opts = helpers.opts.bind(null, target),
randomName = helpers.randomName,
assertType = helpers.assertType.bind(null, target),
assertValidation = helpers.assertValidation.bind(null, target),
assertNotFound = helpers.assertNotFound.bind(null, target),
assertInvalidArgument = helpers.assertInvalidArgument.bind(null, target)

describe('decreaseStreamRetentionPeriod', function() {

describe('validations', function() {

it('should return ValidationException for no StreamName', function(done) {
assertValidation({},
'2 validation errors detected: ' +
'Value null at \'retentionPeriodHours\' failed to satisfy constraint: ' +
'Member must not be null; ' +
'Value null at \'streamName\' failed to satisfy constraint: ' +
'Member must not be null', done)
})

it('should return ValidationException for empty StreamName', function(done) {
assertValidation({StreamName: '', RetentionPeriodHours: -1},
'3 validation errors detected: ' +
'Value \'-1\' at \'retentionPeriodHours\' failed to satisfy constraint: ' +
'Member must have value greater than or equal to 1; Value \'\' at \'streamName\' ' +
'failed to satisfy constraint: Member must satisfy regular expression pattern: ' +
'[a-zA-Z0-9_.-]+; Value \'\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length greater than or equal to 1', done)
})

it('should return ValidationException for long StreamName', function(done) {
var name = new Array(129 + 1).join('a')
assertValidation({StreamName: name, RetentionPeriodHours: 24},
'1 validation error detected: ' +
'Value \'' + name + '\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128', done)
})

it('should return InvalidArgumentException for retention period less than 24', function(done) {
var hours = 23
assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Minimum allowed retention period is 24 hours. ' +
'Requested retention period (' + hours + ' hours) is too short.', done)
})

it('should return InvalidArgumentException for retention period greater than 168', function(done) {
var hours = 169
assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Maximum allowed retention period is 168 hours. ' +
'Requested retention period (' + hours + ' hours) is too long.', done)
})

it('should return InvalidArgumentException for retention period greater than current', function(done) {
var hours = 25

assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Requested retention period (' + hours + ' hours) for stream ' + helpers.testStream +
' can not be longer than existing retention period (24 hours).' +
' Use IncreaseRetentionPeriod API.', done)
})

it('should return ResourceNotFoundException if stream does not exist', function(done) {
var name1 = randomName()
assertNotFound({StreamName: name1, RetentionPeriodHours: 25 },
'Stream ' + name1 + ' under account ' + helpers.awsAccountId + ' not found.', done)
})
})

describe('functionality', function() {

it('should decrease stream retention period', function(done) {
request(helpers.opts('IncreaseStreamRetentionPeriod', {
StreamName: helpers.testStream,
RetentionPeriodHours: 25,
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

var hours = 24
request(opts({
StreamName: helpers.testStream,
RetentionPeriodHours: hours
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

request(helpers.opts('DescribeStream', {
StreamName: helpers.testStream,
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

res.body.StreamDescription.RetentionPeriodHours.should.eql(hours)

done();
})
})
})
})

})

})

110 changes: 110 additions & 0 deletions test/increaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
var helpers = require('./helpers')

require('should');

var target = 'IncreaseStreamRetentionPeriod',
request = helpers.request,
opts = helpers.opts.bind(null, target),
randomName = helpers.randomName,
assertType = helpers.assertType.bind(null, target),
assertValidation = helpers.assertValidation.bind(null, target),
assertNotFound = helpers.assertNotFound.bind(null, target),
assertInvalidArgument = helpers.assertInvalidArgument.bind(null, target)

describe('increaseStreamRetentionPeriod', function() {

describe('validations', function() {

it('should return ValidationException for no StreamName', function(done) {
assertValidation({},
'2 validation errors detected: ' +
'Value null at \'retentionPeriodHours\' failed to satisfy constraint: ' +
'Member must not be null; ' +
'Value null at \'streamName\' failed to satisfy constraint: ' +
'Member must not be null', done)
})

it('should return ValidationException for empty StreamName', function(done) {
assertValidation({StreamName: '', RetentionPeriodHours: -1},
'3 validation errors detected: ' +
'Value \'-1\' at \'retentionPeriodHours\' failed to satisfy constraint: ' +
'Member must have value greater than or equal to 1; Value \'\' at \'streamName\' ' +
'failed to satisfy constraint: Member must satisfy regular expression pattern: ' +
'[a-zA-Z0-9_.-]+; Value \'\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length greater than or equal to 1', done)
})

it('should return ValidationException for long StreamName', function(done) {
var name = new Array(129 + 1).join('a')
assertValidation({StreamName: name, RetentionPeriodHours: 24},
'1 validation error detected: ' +
'Value \'' + name + '\' at \'streamName\' failed to satisfy constraint: ' +
'Member must have length less than or equal to 128', done)
})

it('should return InvalidArgumentException for retention period less than 24', function(done) {
var hours = 23
assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Minimum allowed retention period is 24 hours. ' +
'Requested retention period (' + hours + ' hours) is too short.', done)
})

it('should return InvalidArgumentException for retention period greater than 168', function(done) {
var hours = 169
assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Maximum allowed retention period is 168 hours. ' +
'Requested retention period (' + hours + ' hours) is too long.', done)
})

it('should return InvalidArgumentException for retention period less than current', function(done) {
var hours = 24

request(opts({
StreamName: helpers.testStream,
RetentionPeriodHours: 25,
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

assertInvalidArgument({StreamName: helpers.testStream, RetentionPeriodHours: hours},
'Requested retention period (' + hours + ' hours) for stream ' + helpers.testStream +
' can not be shorter than existing retention period (25 hours).' +
' Use DecreaseRetentionPeriod API.', done)
})
})

it('should return ResourceNotFoundException if stream does not exist', function(done) {
var name1 = randomName()
assertNotFound({StreamName: name1, RetentionPeriodHours: 25 },
'Stream ' + name1 + ' under account ' + helpers.awsAccountId + ' not found.', done)
})
})

describe('functionality', function() {

it('should increase stream retention period', function(done) {
var hours = 25
request(opts({
StreamName: helpers.testStream,
RetentionPeriodHours: hours,
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

request(helpers.opts('DescribeStream', {
StreamName: helpers.testStream,
}), function(err, res) {
if (err) return done(err)
res.statusCode.should.equal(200)

res.body.StreamDescription.RetentionPeriodHours.should.eql(hours)

done();
})
})
})

})

})

15 changes: 15 additions & 0 deletions validations/decreaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
exports.types = {
RetentionPeriodHours: {
type: 'Integer',
notNull: true,
greaterThanOrEqual: 1,
lessThanOrEqual: 10000,
},
StreamName: {
type: 'String',
notNull: true,
regex: '[a-zA-Z0-9_.-]+',
lengthGreaterThanOrEqual: 1,
lengthLessThanOrEqual: 128,
},
}
15 changes: 15 additions & 0 deletions validations/increaseStreamRetentionPeriod.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
exports.types = {
RetentionPeriodHours: {
type: 'Integer',
notNull: true,
greaterThanOrEqual: 1,
lessThanOrEqual: 10000,
},
StreamName: {
type: 'String',
notNull: true,
regex: '[a-zA-Z0-9_.-]+',
lengthGreaterThanOrEqual: 1,
lengthLessThanOrEqual: 128,
},
}

0 comments on commit 3e38b50

Please sign in to comment.