Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
Added basic server and stream creation/deletion/listing validation an…
Browse files Browse the repository at this point in the history
…d functionality
  • Loading branch information
mhart committed Nov 15, 2013
1 parent f39d031 commit 1f6ba13
Show file tree
Hide file tree
Showing 23 changed files with 784 additions and 1 deletion.
70 changes: 70 additions & 0 deletions actions/createStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
var db = require('../db'),
metaDb = db.metaDb

module.exports = function createStream(data, cb) {

var key = data.StreamName

metaDb.lock(key, function(release) {
cb = release(cb)

metaDb.get(key, function(err) {
if (err && err.name != 'NotFoundError') return cb(err)
if (!err) {
err = new Error
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.kinesis.v20130901#ResourceInUseException',
message: '',
}
return cb(err)
}

var i, shards = new Array(data.ShardCount)
for (i = 0; i < data.ShardCount; i++) {
shards[i] = {
AdjacentParentShardId: '',
HashKeyRange: {
EndingHashKey: '',
StartingHashKey: '',
},
ParentShardId: '',
SequenceNumberRange: {
EndingSequenceNumber: '',
StartingSequenceNumber: '',
},
ShardId: (i + 1).toString()
}
}
data = {
IsMoreDataAvailable: false,
Shards: shards,
StreamARN: 'arn:aws:kinesis:<region>:<number>:' + data.StreamName,
StreamName: data.StreamName,
StreamStatus: 'CREATING'
}

metaDb.put(key, data, function(err) {
if (err) return cb(err)

setTimeout(function() {

// Shouldn't need to lock/fetch as nothing should have changed
data.StreamStatus = 'ACTIVE'

metaDb.put(key, data, function(err) {
// TODO: Need to check this
if (err) console.error(err)
})

}, db.createStreamMs)

cb()
})
})
})

}



46 changes: 46 additions & 0 deletions actions/deleteStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
var db = require('../db'),
metaDb = db.metaDb

module.exports = function deleteStream(data, cb) {

var key = data.StreamName

db.getStream(key, false, function(err, stream) {
if (err) return cb(err)

// Check if stream is ACTIVE or not?
if (stream.StreamStatus == 'CREATING') {
err = new Error
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.kinesis.v20130901#ResourceInUseException',
message: 'Attempt to change a resource which is still in use: Stream is being created: ' + key,
}
return cb(err)
}

stream.StreamStatus = 'DELETING'

metaDb.put(key, stream, function(err) {
if (err) return cb(err)

db.deleteStreamDb(key, function(err) {
if (err) return cb(err)

setTimeout(function() {
// TODO: Delete records too
metaDb.del(key, function(err) {
// TODO: Need to check this
if (err) console.error(err)
})
}, db.deleteStreamMs)

cb()
})
})
})

}



13 changes: 13 additions & 0 deletions actions/describeStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
var db = require('../db')

module.exports = function describeStream(data, cb) {

db.getStream(data.StreamName, false, function(err, stream) {
if (err) return cb(err)

cb(null, {StreamDescription: stream})
})
}



Empty file added actions/getNextRecords.js
Empty file.
Empty file added actions/getShardIterator.js
Empty file.
23 changes: 23 additions & 0 deletions actions/listStreams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
var once = require('once'),
db = require('../db'),
metaDb = db.metaDb

module.exports = function listStreams(data, cb) {
cb = once(cb)
var opts, keys

if (data.ExclusiveStartStreamName)
opts = {start: data.ExclusiveStartStreamName + '\x00'}

keys = db.lazy(metaDb.createKeyStream(opts), cb)

if (data.Limit) keys = keys.take(data.Limit)

keys.join(function(names) {
var result = {StreamNames: names}
if (data.Limit) result.IsMoreDataAvailable = true // TODO: fix this
cb(null, result)
})
}


Empty file added actions/mergeShards.js
Empty file.
Empty file added actions/putRecord.js
Empty file.
Empty file added actions/splitShard.js
Empty file.
3 changes: 3 additions & 0 deletions cli.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env node

require('.')().listen(process.env.PORT || 4567)
88 changes: 88 additions & 0 deletions db/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
var lazy = require('lazy'),
levelup = require('levelup'),
MemDown = require('memdown'),
sublevel = require('level-sublevel'),
deleteStream = require('level-delete-stream'),
Lock = require('lock')

var db = sublevel(levelup('./mydb', {db: function(location) { return new MemDown(location) }})),
metaDb = db.sublevel('meta', {valueEncoding: 'json'}),
streamDbs = []

exports.createStreamMs = 500
exports.deleteStreamMs = 500
exports.lazy = lazyStream
exports.metaDb = metaDb
exports.getStreamDb = getStreamDb
exports.deleteStreamDb = deleteStreamDb
exports.getStream = getStream
exports.validationError = validationError
exports.checkConditional = checkConditional

metaDb.lock = new Lock()

function getStreamDb(name) {
if (!streamDbs[name]) {
streamDbs[name] = db.sublevel('stream-' + name, {valueEncoding: 'json'})
streamDbs[name].lock = new Lock()
}
return streamDbs[name]
}

function deleteStreamDb(name, cb) {
var streamDb = streamDbs[name] || db.sublevel('stream-' + name, {valueEncoding: 'json'})
delete streamDbs[name]
itemDb.createKeyStream().pipe(deleteStream(db, cb))
}

function getStream(name, checkStatus, cb) {
if (typeof checkStatus == 'function') cb = checkStatus

streamDb.get(name, function(err, stream) {
if (!err && checkStatus && (stream.StreamStatus == 'CREATING' || stream.StreamStatus == 'DELETING')) {
err = new Error('NotFoundError')
err.name = 'NotFoundError'
}
if (err) {
if (err.name == 'NotFoundError') {
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.kinesis.v20130901#ResourceNotFoundException',
message: 'Requested resource not found',
}
if (!checkStatus) err.body.message += ': Stream: ' + name + ' not found'
}
return cb(err)
}

cb(null, stream)
})
}

function lazyStream(stream, errHandler) {
if (errHandler) stream.on('error', errHandler)
return lazy(stream)
}

function validationError(msg) {
if (msg == null) msg = 'The provided key element does not match the schema'
var err = new Error(msg)
err.statusCode = 400
err.body = {
__type: 'com.amazon.coral.validate#ValidationException',
message: msg,
}
return err
}

function conditionalError(msg) {
if (msg == null) msg = 'The conditional request failed'
var err = new Error(msg)
err.statusCode = 400
err.body = {
__type: 'com.amazonaws.kinesis.v20130901#ConditionalCheckFailedException',
message: msg,
}
return err
}

Loading

0 comments on commit 1f6ba13

Please sign in to comment.