This repository has been archived by the owner on Feb 9, 2021. It is now read-only.
forked from mhart/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 1
/
createStream.js
82 lines (67 loc) · 3.13 KB
/
createStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
var BigNumber = require('bignumber.js'),
db = require('../db')
var POW_128 = new BigNumber(2).pow(128),
SEQ_ADJUST_MS = 2000
module.exports = function createStream(store, data, cb) {
var key = data.StreamName, metaDb = store.metaDb
metaDb.lock(key, function(release) {
cb = release(cb)
metaDb.get(key, function(err) {
if (err && err.name != 'NotFoundError') return cb(err)
if (!err)
return cb(db.clientError('ResourceInUseException',
'Stream ' + key + ' under account ' + metaDb.awsAccountId + ' already exists.'))
db.sumShards(store, function(err, shardSum) {
if (err) return cb(err)
if (shardSum + data.ShardCount > store.shardLimit) {
return cb(db.clientError('LimitExceededException',
'This request would exceed the shard limit for the account ' + metaDb.awsAccountId + ' in ' +
metaDb.awsRegion + '. Current shard count for the account: ' + shardSum +
'. Limit: ' + store.shardLimit + '. Number of additional shards that would have ' +
'resulted from this request: ' + data.ShardCount + '. Refer to the AWS Service Limits page ' +
'(http://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) ' +
'for current limits and how to request higher limits.'))
}
var i, shards = new Array(data.ShardCount), shardHash = POW_128.div(data.ShardCount).integerValue(BigNumber.ROUND_FLOOR),
createTime = Date.now() - SEQ_ADJUST_MS, stream
for (i = 0; i < data.ShardCount; i++) {
shards[i] = {
HashKeyRange: {
StartingHashKey: shardHash.times(i).toFixed(),
EndingHashKey: (i < data.ShardCount - 1 ? shardHash.times(i + 1) : POW_128).minus(1).toFixed(),
},
SequenceNumberRange: {
StartingSequenceNumber: db.stringifySequence({shardCreateTime: createTime, shardIx: i}),
},
ShardId: db.shardIdName(i),
}
}
stream = {
RetentionPeriodHours: 24,
EnhancedMonitoring: [{ShardLevelMetrics: []}],
EncryptionType: 'NONE',
HasMoreShards: false,
Shards: [],
StreamARN: 'arn:aws:kinesis:' + metaDb.awsRegion + ':' + metaDb.awsAccountId + ':stream/' + data.StreamName,
StreamName: data.StreamName,
StreamStatus: 'CREATING',
StreamCreationTimestamp: Math.floor(createTime / 1000),
_seqIx: new Array(Math.ceil(data.ShardCount / 5)), // Hidden data, remove when returning
_tags: Object.create(null), // Hidden data, remove when returning
}
metaDb.put(key, stream, function(err) {
if (err) return cb(err)
setTimeout(function() {
// Shouldn't need to lock/fetch as nothing should have changed
stream.StreamStatus = 'ACTIVE'
stream.Shards = shards
metaDb.put(key, stream, function(err) {
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
})
}, store.createStreamMs)
cb()
})
})
})
})
}