Skip to content

Commit

Permalink
Added main parts of library
Browse files Browse the repository at this point in the history
  • Loading branch information
Valery Zakharchenko committed Jun 21, 2018
0 parents commit aa55439
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 0 deletions.
44 changes: 44 additions & 0 deletions Kinesis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const AWS = require('aws-sdk');
const KinesisObserver = require('./KinesisObserver');
const KinesisProducer = require('./KinesisProducer');
const KinesisConsumer = require('./KinesisConsumer');

class Kinesis {
constructor(config, iteratorManager) {
this.kinesis = new AWS.Kinesis(config);
this.kinsesisObserver = new KinesisObserver();
this.kinesisProducer = new KinesisProducer(this.kinesis);
this.kinesisConsumer = new KinesisConsumer(this.kinesis, iteratorManager);

this.consuming = false;
}

/**
* Listen events by stream name
* @param stream string
* @param listener function
*/
on(stream, listener) {
this.kinsesisObserver.subscribe(stream, listener);


if (!this.consuming) {
this.consuming = true;

this.kinesisConsumer.on(stream, data => this.kinsesisObserver.notify(stream, data));
}
}

/**
* Emit event by stream name
* @param stream string
* @param partitionKey string
* @param data Object
* @return {Promise.<PromiseResult.<D, E>>}
*/
emit(stream, partitionKey, data) {
return this.kinesisProducer.emit(stream, partitionKey, data);
}
}

module.exports = Kinesis;
89 changes: 89 additions & 0 deletions KinesisConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const forever = require('./utils/forever');
const KinesisIteratorManager = require('./managers/KinesisIteratorManager');

class KinesisConsumer {
constructor(kinesis, iteratorManager) {
if (!(iteratorManager instanceof KinesisIteratorManager)) {
throw new Error('IteratorManager should be instance of KinesisIteratorManager');
}

this.kinesis = kinesis;
this.iteratorManager = iteratorManager;
}

/**
* Decode records from base64 buffer and parse json result
* @param records
* @private
*/
_decodeRecords(records) {
return records.map(record => JSON.parse(Buffer.from(record.Data, 'base64').toString('utf8')));
}

/**
* Listen events by stream name
*
* NOTICE: If this method will invoking multiple times -> it will cause that request to kinesis will increase in the
* arithmetic progression.
*
* @param stream
* @param listener
*/
on(stream, listener) {
forever(async (done) => {
try {
// Get description about stream
const { StreamDescription: { Shards } } = await this.kinesis.describeStream({ StreamName: stream }).promise();

// Collect requests for each shard to get records
const promises = Shards.map(async (shard) => {
// If shardIterator is empty -> get iterator from Kinesis
if (!await this.iteratorManager.getIterator({ streamName: stream, shardId: shard.ShardId })) {
const { ShardIterator } = await this.kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'TRIM_HORIZON',
StreamName: stream,
}).promise();

await this.iteratorManager.setIterator({
streamName: stream,
shardId: shard.ShardId,
shardIterator: ShardIterator,
});
}

const shardIterator = await this.iteratorManager.getIterator({ streamName: stream, shardId: shard.ShardId });

const {
Records,
NextShardIterator,
} = await this.kinesis.getRecords({ ShardIterator: shardIterator }).promise();

// Set next shard iterator for next stream iterations
await this.iteratorManager.setIterator({
streamName: stream,
shardId: shard.ShardId,
shardIterator: NextShardIterator,
});

// Call callback only if data available in a response
if (Records.length) {
listener(this._decodeRecords(Records));
}
});

await Promise.all(promises);
done();
} catch (e) {
if (!['LimitExceededException', 'ProvisionedThroughputExceededException'].includes(e.name)) {
console.log('======>', e.name);
throw e;
}

done();
}
});
}
}

module.exports = KinesisConsumer;
31 changes: 31 additions & 0 deletions KinesisObserver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
class KinesisObserver {
constructor() {
this.subscribers = {};
}

/**
* Subscribe to notification by key
* @param key
* @param listener
*/
subscribe(key, listener) {
if (!this.subscribers[key]) {
this.subscribers[key] = { listeners: [] };
}

this.subscribers[key].listeners.push(listener);
}

/**
* Notify all subscribers by key
* @param key
* @param data
*/
notify(key, data) {
if (this.subscribers[key] && this.subscribers[key].listeners.length > 0) {
this.subscribers[key].listeners.forEach(listener => listener(data));
}
}
}

module.exports = KinesisObserver;
22 changes: 22 additions & 0 deletions KinesisProducer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class KinesisProducer {
constructor(kinesis) {
this.kinesis = kinesis;
}

/**
* Emit data to stream
* @param stream
* @param partitionKey
* @param data
* @return {Promise<PromiseResult<D, E>>}
*/
emit(stream, partitionKey, data) {
return this.kinesis.putRecord({
StreamName: stream,
PartitionKey: partitionKey,
Data: JSON.stringify(data),
}).promise();
}
}

module.exports = KinesisProducer;
52 changes: 52 additions & 0 deletions managers/KinesisFileIteratorManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
const fs = require('fs');
const get = require('lodash/get');
const set = require('lodash/set');
const KinesisIteratorManager = require('./KinesisIteratorManager');

class KinesisMemoryIteratorManager extends KinesisIteratorManager {
constructor() {
super('file');

this.iterators = {};
this.filePath = `${process.cwd()}/kinesis_iterators`;
}

_readFile() {
return new Promise((resolve) => {
fs.readFile(this.filePath, (err, data) => {
if (err) {
resolve({});
} else {
resolve(JSON.parse(data));
}
});
});
}

_writeFile(data) {
return new Promise((resolve, reject) => {
fs.writeFile(this.filePath, JSON.stringify(data), (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

async getIterator({ streamName, shardId }) {
const iterators = await this._readFile();

return get(iterators, [this.consumerName, streamName, shardId]);
}

async setIterator({ streamName, shardId, shardIterator }) {
const iterators = await this._readFile();
const updatedIterators = set(iterators, [this.consumerName, streamName, shardId], shardIterator);

await this._writeFile(updatedIterators);
}
}

module.exports = KinesisMemoryIteratorManager;
28 changes: 28 additions & 0 deletions managers/KinesisIteratorManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class KinesisIteratorManager {
constructor(consumerName) {
this.consumerName = consumerName;
this.iterators = {};
}

/**
* Get shard iterator by consumer name, stream name and shard id
* @param streamName
* @param shardId
* @return string
*/
getIterator({ streamName, shardId }) {
throw new Error('Method getIterator should be implemented');
}

/**
* Store shard iterator by consumer name, stream name and shard id
* @param streamName
* @param shardId
* @param shardIterator
*/
setIterator({ streamName, shardId, shardIterator }) {
throw new Error('Method setIterator should be implemented');
}
}

module.exports = KinesisIteratorManager;
21 changes: 21 additions & 0 deletions managers/KinesisMemoryIteratorManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const get = require('lodash/get');
const set = require('lodash/set');
const KinesisIteratorManager = require('./KinesisIteratorManager');

class KinesisMemoryIteratorManager extends KinesisIteratorManager {
constructor() {
super('memory');

this.iterators = {};
}

getIterator({ streamName, shardId }) {
return get(this.iterators, [this.consumerName, streamName, shardId], null);
}

setIterator({ streamName, shardId, shardIterator }) {
set(this.iterators, [this.consumerName, streamName, shardId], shardIterator);
}
}

module.exports = KinesisMemoryIteratorManager;
9 changes: 9 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "kinesis-node",
"version": "1.0.0",
"description": "Kinesis client (consumer and producer) for Node.js written in pure Node.js",
"main": "index.js",
"repository": "https://github.com/valeryq/kinesis-node.git",
"author": "Valery Zakharchenko",
"license": "MIT"
}
5 changes: 5 additions & 0 deletions utils/forever.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = (fn) => {
const done = () => fn(done);

return done();
};

0 comments on commit aa55439

Please sign in to comment.