forked from DaveB93/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
248 lines (204 loc) · 8.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
var https = require('https'),
http = require('http'),
fs = require('fs'),
path = require('path'),
url = require('url'),
crypto = require('crypto'),
uuid = require('node-uuid'),
validations = require('./validations'),
db = require('./db')
var MAX_REQUEST_BYTES = 7 * 1024 * 1024
var validApis = ['Kinesis_20131202'],
validOperations = ['AddTagsToStream', 'CreateStream', 'DeleteStream', 'DescribeStream', 'GetRecords',
'GetShardIterator', 'ListStreams', 'ListTagsForStream', 'MergeShards', 'PutRecord', 'PutRecords',
'RemoveTagsFromStream', 'SplitShard', 'IncreaseStreamRetentionPeriod', 'DecreaseStreamRetentionPeriod'],
actions = {},
actionValidations = {}
module.exports = kinesalite
function kinesalite(options) {
options = options || {}
var server, store = db.create(options), requestHandler = httpHandler.bind(null, store)
if (options.ssl) {
options.key = options.key || fs.readFileSync(path.join(__dirname, 'ssl', 'server-key.pem'))
options.cert = options.cert || fs.readFileSync(path.join(__dirname, 'ssl', 'server-crt.pem'))
options.ca = options.ca || fs.readFileSync(path.join(__dirname, 'ssl', 'ca-crt.pem'))
server = https.createServer(options, requestHandler)
} else {
server = http.createServer(requestHandler)
}
// Ensure we close DB when we're closing the server too
var httpServerClose = server.close, httpServerListen = server.listen
server.close = function(cb) {
store.db.close(function(err) {
if (err) return cb(err)
// Recreate the store if the user wants to listen again
server.listen = function() {
store.recreate()
httpServerListen.apply(server, arguments)
}
httpServerClose.call(server, cb)
})
}
return server
}
validOperations.forEach(function(action) {
action = validations.toLowerFirst(action)
actions[action] = require('./actions/' + action)
actionValidations[action] = require('./validations/' + action)
})
function sendRaw(req, res, body, statusCode) {
req.removeAllListeners()
res.statusCode = statusCode || 200
if (body != null) res.setHeader('Content-Length', Buffer.byteLength(body, 'utf8'))
// AWS doesn't send a 'Connection' header but seems to use keep-alive behaviour
// res.setHeader('Connection', '')
// res.shouldKeepAlive = false
res.end(body)
}
function sendJson(req, res, data, statusCode) {
var body = data != null ? JSON.stringify(data) : ''
res.setHeader('Content-Type', res.contentType)
sendRaw(req, res, body, statusCode)
}
function sendError(req, res, contentValid, type, msg) {
return contentValid ? sendJson(req, res, {__type: type, message: msg}, 400) :
typeof msg == 'number' ? sendRaw(req, res, '<' + type + '/>\n', msg) :
sendRaw(req, res, '<' + type + '>\n <Message>' + msg + '</Message>\n</' + type + '>\n', 403)
}
function httpHandler(store, req, res) {
var body
req.on('error', function(err) { throw err })
req.on('data', function(data) {
var newLength = data.length + (body ? body.length : 0)
if (newLength > MAX_REQUEST_BYTES) {
res.setHeader('Transfer-Encoding', 'chunked')
return sendRaw(req, res, null, 413)
}
body = body ? Buffer.concat([body, data], newLength) : data
})
req.on('end', function() {
body = body ? body.toString() : ''
// All responses after this point have a RequestId
res.setHeader('x-amzn-RequestId', uuid.v1())
if (req.method != 'OPTIONS' || !req.headers.origin)
res.setHeader('x-amz-id-2', crypto.randomBytes(72).toString('base64'))
// FIRST check if we've got an origin header:
if (req.headers.origin) {
res.setHeader('Access-Control-Allow-Origin', '*')
// If it's a valid OPTIONS call, return here
if (req.method == 'OPTIONS') {
if (req.headers['access-control-request-headers'])
res.setHeader('Access-Control-Allow-Headers', req.headers['access-control-request-headers'])
if (req.headers['access-control-request-method'])
res.setHeader('Access-Control-Allow-Methods', req.headers['access-control-request-method'])
res.setHeader('Access-Control-Max-Age', 172800)
return sendRaw(req, res, '')
}
res.setHeader('Access-Control-Expose-Headers', ['x-amz-request-id', 'x-amz-id-2'])
}
var contentType = (req.headers['content-type'] || '').split(';')[0].trim()
var contentValid = req.method == 'POST' && ~['application/x-amz-json-1.1', 'application/json'].indexOf(contentType)
var target = (req.headers['x-amz-target'] || '').split('.')
var service = target[0]
var operation = target[1]
var serviceValid = service && ~validApis.indexOf(service)
var operationValid = operation && ~validOperations.indexOf(operation)
// AWS doesn't seem to care about the HTTP path, so no checking needed for that
// THEN if the method and content-type are ok, see if the JSON parses:
var data
if (contentValid) {
res.contentType = contentType
if (body) {
try { data = JSON.parse(body) } catch (e) { }
if (typeof data != 'object' || data == null) {
if (contentType == 'application/json') {
return sendJson(req, res, {
Output: {__type: 'com.amazon.coral.service#SerializationException', Message: null},
Version: '1.0',
}, 200)
}
return sendJson(req, res, {__type: 'SerializationException'}, 400)
}
}
// After this point, application/json doesn't seem to progress any further
if (contentType == 'application/json') {
return sendJson(req, res, {
Output: {__type: 'com.amazon.coral.service#UnknownOperationException', message: null},
Version: '1.0',
}, 200)
}
if (!serviceValid || !operationValid) {
return sendJson(req, res, {__type: 'UnknownOperationException'}, 400)
}
if (!data) {
return sendJson(req, res, {__type: 'SerializationException'}, 400)
}
}
// THEN check auth:
var authHeader = req.headers.authorization
var query = ~req.url.indexOf('?') ? url.parse(req.url, true).query : {}
var authQuery = 'X-Amz-Algorithm' in query
var msg = '', params
if (authHeader && authQuery) {
return sendError(req, res, contentValid, 'InvalidSignatureException',
'Found both \'X-Amz-Algorithm\' as a query-string param and \'Authorization\' as HTTP header.')
}
if (!authHeader && !authQuery) {
return sendError(req, res, contentValid, 'MissingAuthenticationTokenException', 'Missing Authentication Token')
}
if (authHeader) {
params = ['Credential', 'Signature', 'SignedHeaders']
var authParams = authHeader.split(/,| /).slice(1).filter(Boolean).reduce(function(obj, x) {
var keyVal = x.trim().split('=')
obj[keyVal[0]] = keyVal[1]
return obj
}, {})
params.forEach(function(param) {
if (!authParams[param])
msg += 'Authorization header requires \'' + param + '\' parameter. '
})
if (!req.headers['x-amz-date'] && !req.headers.date)
msg += 'Authorization header requires existence of either a \'X-Amz-Date\' or a \'Date\' header. '
if (msg) msg += 'Authorization=' + authHeader
} else {
params = ['X-Amz-Algorithm', 'X-Amz-Credential', 'X-Amz-Signature', 'X-Amz-SignedHeaders', 'X-Amz-Date']
params.forEach(function(param) {
if (!query[param])
msg += 'AWS query-string parameters must include \'' + param + '\'. '
})
if (msg) msg += 'Re-examine the query-string parameters.'
}
if (msg) {
return sendError(req, res, contentValid, 'IncompleteSignatureException', msg)
}
// THEN if we don't have the correct method + content-type, we'll be exiting here:
if (!contentValid) {
if (!service || !operation) {
return sendError(req, res, false, 'AccessDeniedException',
'Unable to determine service/operation name to be authorized')
} else if (!serviceValid) {
return sendError(req, res, false, 'UnrecognizedClientException',
'No authorization strategy was found for service: ' + service + ', operation: ' + operation)
} else if (!operationValid) {
return sendError(req, res, false, 'InternalFailure', 500)
}
return sendError(req, res, false, 'UnknownOperationException', 404)
}
// If we've reached here, we're good to go:
var action = validations.toLowerFirst(operation)
var actionValidation = actionValidations[action]
try {
data = validations.checkTypes(data, actionValidation.types)
validations.checkValidations(data, actionValidation.types, actionValidation.custom, operation)
} catch (e) {
if (e.statusCode) return sendJson(req, res, e.body, e.statusCode)
throw e
}
actions[action](store, data, function(err, data) {
if (err && err.statusCode) return sendJson(req, res, err.body, err.statusCode)
if (err) throw err
sendJson(req, res, data)
})
})
}
if (require.main === module) kinesalite().listen(4567)