This repository has been archived by the owner on Feb 9, 2021. It is now read-only.
forked from mhart/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mergeShards.js
118 lines (93 loc) · 4.21 KB
/
mergeShards.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
var BigNumber = require('bignumber.js'),
db = require('../db')
module.exports = function mergeShards(store, data, cb) {
var metaDb = store.metaDb, key = data.StreamName, shardNames = [data.ShardToMerge, data.AdjacentShardToMerge],
shardInfo, shardIds = [], shardIxs = [], i
for (i = 0; i < shardNames.length; i++) {
try {
shardInfo = db.resolveShardId(shardNames[i])
} catch (e) {
return cb(db.clientError('ResourceNotFoundException',
'Could not find shard ' + shardNames[i] + ' in stream ' + key +
' under account ' + metaDb.awsAccountId + '.'))
}
shardIds[i] = shardInfo.shardId
shardIxs[i] = shardInfo.shardIx
}
metaDb.lock(key, function(release) {
cb = release(cb)
store.getStream(key, function(err, stream) {
if (err) return cb(err)
if (stream.StreamStatus != 'ACTIVE') {
return cb(db.clientError('ResourceInUseException',
'Stream ' + data.StreamName + ' under account ' + metaDb.awsAccountId +
' not ACTIVE, instead in state ' + stream.StreamStatus))
}
for (i = 0; i < shardIxs.length; i++) {
if (shardIxs[i] >= stream.Shards.length) {
return cb(db.clientError('ResourceNotFoundException',
'Could not find shard ' + shardIds[i] + ' in stream ' + key +
' under account ' + metaDb.awsAccountId + '.'))
}
}
var shards = [stream.Shards[shardIxs[0]], stream.Shards[shardIxs[1]]]
if (!new BigNumber(shards[0].HashKeyRange.EndingHashKey).plus(1).eq(shards[1].HashKeyRange.StartingHashKey)) {
return cb(db.clientError('InvalidArgumentException',
'Shards ' + shardIds[0] + ' and ' + shardIds[1] + ' in stream ' + key +
' under account ' + metaDb.awsAccountId + ' are not an adjacent pair of shards eligible for merging'))
}
if (stream.StreamStatus != 'ACTIVE') {
return cb(db.clientError('ResourceInUseException',
'Stream ' + key + ' under account ' + metaDb.awsAccountId +
' not ACTIVE, instead in state ' + stream.StreamStatus))
}
stream.StreamStatus = 'UPDATING'
metaDb.put(key, stream, function(err) {
if (err) return cb(err)
setTimeout(function() {
metaDb.lock(key, function(release) {
cb = release(function(err) {
if (err && !/Database is not open/.test(err)) console.error(err.stack || err)
})
store.getStream(key, function(err, stream) {
if (err && err.name == 'NotFoundError') return cb()
if (err) return cb(err)
var now = Date.now()
shards = [stream.Shards[shardIxs[0]], stream.Shards[shardIxs[1]]]
stream.StreamStatus = 'ACTIVE'
shards[0].SequenceNumberRange.EndingSequenceNumber = db.stringifySequence({
shardCreateTime: db.parseSequence(shards[0].SequenceNumberRange.StartingSequenceNumber).shardCreateTime,
shardIx: shardIxs[0],
seqIx: new BigNumber('7fffffffffffffff', 16).toFixed(),
seqTime: now,
})
shards[1].SequenceNumberRange.EndingSequenceNumber = db.stringifySequence({
shardCreateTime: db.parseSequence(shards[1].SequenceNumberRange.StartingSequenceNumber).shardCreateTime,
shardIx: shardIxs[1],
seqIx: new BigNumber('7fffffffffffffff', 16).toFixed(),
seqTime: now,
})
stream.Shards.push({
ParentShardId: shardIds[0],
AdjacentParentShardId: shardIds[1],
HashKeyRange: {
StartingHashKey: shards[0].HashKeyRange.StartingHashKey,
EndingHashKey: shards[1].HashKeyRange.EndingHashKey,
},
SequenceNumberRange: {
StartingSequenceNumber: db.stringifySequence({
shardCreateTime: now + 1000,
shardIx: stream.Shards.length,
}),
},
ShardId: db.shardIdName(stream.Shards.length),
})
metaDb.put(key, stream, cb)
})
})
}, store.updateStreamMs)
cb()
})
})
})
}