forked from mhart/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
getRecords.js
121 lines (100 loc) · 4.09 KB
/
getRecords.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
var crypto = require('crypto'),
once = require('once'),
db = require('../db')
module.exports = function getRecords(store, data, cb) {
var metaDb = store.metaDb, shardIx, shardId, iteratorTime, streamName, seqNo, pieces,
buffer = new Buffer(data.ShardIterator, 'base64'), now = Date.now(),
decipher = crypto.createDecipher('aes-256-cbc', db.ITERATOR_PWD)
if (buffer.length < 152 || buffer.length > 280 || buffer.toString('base64') != data.ShardIterator)
return cb(invalidShardIterator())
if (buffer.slice(0, 8).toString('hex') != '0000000000000001')
return cb(invalidShardIterator())
try {
pieces = Buffer.concat([decipher.update(buffer.slice(8)), decipher.final()]).toString('utf8').split('/')
} catch (e) {
return cb(invalidShardIterator())
}
if (pieces.length != 5)
return cb(invalidShardIterator())
iteratorTime = +pieces[0]
streamName = pieces[1]
shardId = pieces[2]
seqNo = pieces[3]
shardIx = parseInt(shardId.split('-')[1])
if (!/^shardId-[\d]{12}$/.test(shardId) || !(shardIx >= 0 && shardIx < 2147483648))
return cb(invalidShardIterator())
if (!(iteratorTime > 0 && iteratorTime < Date.now()))
return cb(invalidShardIterator())
if (!/[a-zA-Z0-9_.-]+/.test(streamName) || !streamName.length || streamName.length > 128)
return cb(invalidShardIterator())
if ((now - iteratorTime) > 300000) {
return cb(db.clientError('ExpiredIteratorException',
'Iterator expired. The iterator was created at time ' + toAmzUtcString(iteratorTime) +
' while right now it is ' + toAmzUtcString(now) + ' which is further in the future than the ' +
'tolerated delay of 300000 milliseconds.'))
}
try {
db.parseSequence(seqNo)
} catch (e) {
return cb(invalidShardIterator())
}
store.getStream(streamName, function(err, stream) {
if (err) {
if (err.name == 'NotFoundError' && err.body) {
err.body.message = 'Shard ' + shardId + ' in stream ' + streamName +
' under account ' + metaDb.awsAccountId + ' does not exist'
}
return cb(err)
}
if (shardIx >= stream.Shards.length) {
return cb(db.clientError('ResourceNotFoundException',
'Shard ' + shardId + ' in stream ' + streamName +
' under account ' + metaDb.awsAccountId + ' does not exist'))
}
cb = once(cb)
var streamDb = store.getStreamDb(streamName), cutoffTime = Date.now() - (24 * 60 * 60 * 1000),
keysToDelete = [], lastItem, opts
opts = {
gte: db.shardIxToHex(shardIx) + '/' + seqNo,
lt: db.shardIxToHex(shardIx + 1),
}
db.lazy(streamDb.createReadStream(opts), cb)
.take(data.Limit || 10000)
.map(function(item) {
lastItem = item.value
lastItem.SequenceNumber = item.key.split('/')[1]
lastItem._seqObj = db.parseSequence(lastItem.SequenceNumber)
lastItem._tooOld = lastItem._seqObj.seqTime < cutoffTime
if (lastItem._tooOld) keysToDelete.push(item.key)
return lastItem
})
.filter(function(item) { return !item._tooOld })
.join(function(items) {
var nextSeq = lastItem ? db.incrementSequence(lastItem._seqObj) : seqNo,
nextShardIterator = db.createShardIterator(streamName, shardId, nextSeq)
cb(null, {
MillisBehindLatest: 0,
NextShardIterator: nextShardIterator,
Records: items.map(function(item) {
delete item._seqObj
delete item._tooOld
return item
}),
})
if (keysToDelete.length) {
// Do this async
streamDb.batch(keysToDelete.map(function(key) { return {type: 'del', key: key} }), function(err) {
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
})
}
})
})
}
function invalidShardIterator() {
return db.clientError('InvalidArgumentException', 'Invalid ShardIterator.')
}
// Thu Jan 22 01:22:02 UTC 2015
function toAmzUtcString(date) {
var pieces = new Date(date).toUTCString().match(/^(.+), (.+) (.+) (.+) (.+) GMT$/)
return [pieces[1], pieces[3], pieces[2], pieces[5], 'UTC', pieces[4]].join(' ')
}