Skip to content

Commit

Permalink
Add chunked stream transformers. Update hashing model.
Browse files Browse the repository at this point in the history
  • Loading branch information
wehriam committed Feb 20, 2021
1 parent 409b864 commit d690f39
Show file tree
Hide file tree
Showing 16 changed files with 841 additions and 272 deletions.
116 changes: 79 additions & 37 deletions dist/map.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ const CID = require('cids');
const { default: AbortController } = require('abort-controller');
const { streamArray: jsonStreamArray } = require('stream-json/streamers/StreamArray');
const LruCache = require('lru-cache');
const { default: PQueue } = require('p-queue');
const { debounce } = require('lodash');
const asyncIterableToReadableStream = require('async-iterable-to-readable-stream');
const { Readable } = require('stream');
const {
SerializeTransform,
DeserializeTransform,
} = require('@bunchtogether/chunked-stream-transformers');









Expand All @@ -27,12 +33,14 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
* @param {Object} [options={}]
* @param {String} [options.maxAge=5000] Max age of insertion/deletion identifiers
* @param {String} [options.bufferPublishing=20] Interval by which to buffer 'publish' events
* @param {boolean} [options.chunkPubSub=false] Chunk pubsub messages for values greater than 1 MB
*/
constructor(ipfs , topic , entries , options = {}) {
super(entries, options);
if (!ipfs) {
throw new Error("Missing required argument 'ipfs'");
}
this.chunkPubSub = !!options.chunkPubSub;
this.ipfs = ipfs;
this.abortController = new AbortController();
this.topic = topic;
Expand All @@ -41,7 +49,6 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
this.boundHandleQueueMessage = this.handleQueueMessage.bind(this);
this.boundHandleHashMessage = this.handleHashMessage.bind(this);
this.readyPromise = this.initIpfs();
this.remoteHashQueue = [];
this.syncCache = new LruCache(100);
this.peersCache = new LruCache({
max: 100,
Expand All @@ -56,6 +63,44 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
});
this.isLoadingHashes = false;
this.debouncedIpfsSync = debounce(this.ipfsSync.bind(this), 1000);
this.serializeTransform = new SerializeTransform({
autoDestroy: false,
maxChunkSize: 1024 * 512,
});
this.serializeTransform.on('data', async (messageSlice) => {
try {
await this.ipfs.pubsub.publish(this.topic, messageSlice, { signal: this.abortController.signal });
} catch (error) {
if (error.type !== 'aborted') {
this.emit('error', error);
}
}
});
this.serializeTransform.on('error', (error) => {
this.emit('error', error);
});
this.deserializeTransform = new DeserializeTransform({
autoDestroy: false,
timeout: 10000,
});
this.deserializeTransform.on('error', (error) => {
this.emit('error', error);
});
this.deserializeTransform.on('data', async (message) => {
try {
const queue = JSON.parse(message.toString('utf8'));
await this.process(queue);
} catch (error) {
this.emit('error', error);
}
});
this.hashLoadQueue = new PQueue({});
this.hashLoadQueue.on('idle', async () => {
if (this.hasNewPeers) {
this.debouncedIpfsSync();
}
this.emit('hashesloaded');
});
}

/**
Expand All @@ -79,10 +124,13 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di












async initIpfs() {
try {
Expand All @@ -98,12 +146,17 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
if (!this.active) {
return;
}
try {
if (this.chunkPubSub) {
const message = Buffer.from(JSON.stringify(queue));
await this.ipfs.pubsub.publish(this.topic, message, { signal: this.abortController.signal });
} catch (error) {
if (error.type !== 'aborted') {
this.emit('error', error);
this.serializeTransform.write(message);
} else {
try {
const message = Buffer.from(JSON.stringify(queue));
await this.ipfs.pubsub.publish(this.topic, message, { signal: this.abortController.signal });
} catch (error) {
if (error.type !== 'aborted') {
this.emit('error', error);
}
}
}
});
Expand Down Expand Up @@ -156,19 +209,17 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
if (!this.active) {
return;
}


try {
const hash = await this.getIpfsHash();
if (!this.active) {
return;
}
if (!this.syncCache.has(hash, true) || this.hasNewPeers) {
this.hasNewPeers = false;
this.syncCache.set(hash, true);
await this.ipfs.pubsub.publish(`${this.topic}:hash`, Buffer.from(hash, 'utf8'), { signal: this.abortController.signal });
this.emit('hash', hash);
}
this.hasNewPeers = false;
} catch (error) {
if (error.type !== 'aborted') {
this.emit('error', error);
Expand Down Expand Up @@ -240,6 +291,9 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
}
this.abortController.abort();
this.abortController = new AbortController();
await this.deserializeTransform.onIdle();
this.serializeTransform.destroy();
this.deserializeTransform.destroy();
}

handleQueueMessage(message ) {
Expand All @@ -249,11 +303,15 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
if (!this.active) {
return;
}
try {
const queue = JSON.parse(Buffer.from(message.data).toString('utf8'));
this.process(queue);
} catch (error) {
this.emit('error', error);
if (this.chunkPubSub) {
this.deserializeTransform.write(message.data);
} else {
try {
const queue = JSON.parse(Buffer.from(message.data).toString('utf8'));
this.process(queue);
} catch (error) {
this.emit('error', error);
}
}
}

Expand All @@ -269,33 +327,20 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
this.peersCache.set(message.from, true);
}
const remoteHash = Buffer.from(message.data).toString('utf8');
this.remoteHashQueue.push(remoteHash);
this.loadIpfsHashes();
}

async loadIpfsHashes() {
if (this.isLoadingHashes) {
if (this.syncCache.has(remoteHash)) {
return;
}
this.isLoadingHashes = true;
this.syncCache.set(remoteHash, true);
try {
while (this.remoteHashQueue.length > 0 && this.active && this.isLoadingHashes) {
const remoteHash = this.remoteHashQueue.pop();
if (this.syncCache.has(remoteHash)) {
continue;
}
this.syncCache.set(remoteHash, true);
await this.loadIpfsHash(remoteHash);
}
this.hashLoadQueue.add(() => this.loadIpfsHash(remoteHash));
} catch (error) {
this.emit('error', error);
}
this.isLoadingHashes = false;
this.debouncedIpfsSync();
}

async loadIpfsHash(hash ) {
const stream = asyncIterableToReadableStream(this.ipfs.cat(new CID(hash), { timeout: 30000, signal: this.abortController.signal })); // eslint-disable-line new-cap
// $FlowFixMe
const stream = Readable.from(this.ipfs.cat(new CID(hash), { timeout: 30000, signal: this.abortController.signal })); // eslint-disable-line new-cap
const parser = jsonStreamParser();
const streamArray = jsonStreamArray();
const pipeline = stream.pipe(parser);
Expand All @@ -321,15 +366,12 @@ class IpfsObservedRemoveMap extends ObservedRemoveMap { // eslint-di
try {
await new Promise((resolve, reject) => {
stream.on('error', (error) => {
console.error('STREAM ERROR');
reject(error);
});
streamArray.on('error', (error) => {
console.error('STREAM ARRAY ERROR');
reject(error);
});
pipeline.on('error', (error) => {
console.error('PIPELINE ERROR');
reject(error);
});
pipeline.on('end', () => {
Expand Down
Loading

0 comments on commit d690f39

Please sign in to comment.