Skip to content

Commit

Permalink
Merge pull request coinbase#5 from coinbase/product-splitter
Browse files Browse the repository at this point in the history
Add ProductSplitter
  • Loading branch information
CjS77 authored Aug 31, 2017
2 parents 26d74c0 + 0d4564d commit 585db0c
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 5 deletions.
8 changes: 3 additions & 5 deletions src/core/LiveOrderbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
SnapshotMessage,
TickerMessage
} from './Messages';
import { Transform } from 'stream';
import { Duplex } from 'stream';

export interface LiveBookConfig {
product: string;
Expand All @@ -50,7 +50,7 @@ export interface SkippedMessageEvent {
* A live orderbook. This class maintains the state of an orderbook (using BookBuilder) in realtime by responding to
* messages from attached feeds.
*/
export class LiveOrderbook extends Transform implements Orderbook {
export class LiveOrderbook extends Duplex implements Orderbook {
public readonly product: string;
public readonly baseCurrency: string;
public readonly quoteCurrency: string;
Expand Down Expand Up @@ -149,9 +149,7 @@ export class LiveOrderbook extends Transform implements Orderbook {
return this._book.ordersForValue(side, Big(value), useQuote, startPrice);
}

protected _transform(msg: any, encoding: string, callback: (err: Error, msg: any) => void) {
callback(null, msg);
}
protected _read() { /* no-op */ }

protected _write(msg: any, encoding: string, callback: () => void): void {
// Pass the msg on to downstream users
Expand Down
37 changes: 37 additions & 0 deletions src/core/ProductSplitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/***************************************************************************************************************************
* @license *
* Copyright 2017 Coinbase, Inc. *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on *
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the *
* License for the specific language governing permissions and limitations under the License. *
***************************************************************************************************************************/
import { StreamCopier } from '../lib/StreamCopier';
import { ExchangeFeed } from '../exchanges/ExchangeFeed';

export class ProductSplitter extends StreamCopier {
private products: string[];

constructor(feed: ExchangeFeed, productIds: string[]) {
const numProducts = productIds.length;
super(feed, numProducts + 1);
this.products = productIds;
this.init();
}

private init() {
this.products.forEach((product: string) => {
this.attach(product);
this.addFilter(product, (msg: any) => {
return msg && msg.productId && msg.productId === product;
});
});
// Add a raw feed channel for other filters to connect to
this.attach('raw');
}
}
1 change: 1 addition & 0 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './ProductFilter';
export * from './RateLimiter';
export * from './Trader';
export * from './Triggers';
export * from './ProductSplitter';
159 changes: 159 additions & 0 deletions src/lib/StreamCopier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/***************************************************************************************************************************
* @license *
* Copyright 2017 Coinbase, Inc. *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on *
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the *
* License for the specific language governing permissions and limitations under the License. *
***************************************************************************************************************************/

import { PassThrough, Readable, Writable } from 'stream';
import * as assert from 'assert';
import { EventEmitter } from 'events';

/**
* Clones a (readable) stream to multiple (writable) streams. Manages backpressure for each output stream, though if
* one stream is much slower than the others you could get large memory consumption. There's no way around this, except
* stopping the feed to all the other streams too.
*
* Accepts an optional filter function that determines whether a message goes to any particular output stream (aka a devil's
* trapdoor).
*/
export class StreamCopier extends EventEmitter {
private feed: Readable;
private outputs: StreamConnection[];
private bufferStreams: PassThrough[];
private numConnected: number;
private numOutputs: number;

/**
* Create a new StreamCopier. `numOutputs` reserves that many outward conections for streams and will buffer messages
* until a downstream consumer is attached.
*
* The `options` parameter is passed onto the Writable streams' constructor and defaults to `objectMode: true` if
* omitted.
*/
constructor(feed: Readable, numOutputs: number, options?: any) {
super();
this.feed = feed;
this.outputs = [];
this.bufferStreams = new Array(numOutputs);
options = options || { objectMode: true };
for (let i = 0; i < numOutputs; i++) {
this.bufferStreams[i] = new PassThrough(options);
this.bufferStreams[i].on('error', (err: Error) => {
this.emit('error', err);
});
}
this.numConnected = 0;
this.numOutputs = numOutputs;
feed.on('readable', () => {
let msg;
// tslint:disable-next-line no-conditional-assignment
while (null !== (msg = feed.read())) {
this.relayMessages(msg);
}
});
}

/**
* Return the number of output streams conected to the feed
*/
get numConnections(): number {
return this.numConnected;
}

/**
* Attachs new output stream to the feed and assign it the given id
* Returns true if the attachement was a success, false otherwise
*/
attach(id: string): boolean {
const numConnections = this.numConnections;
if (numConnections >= this.numOutputs) {
return false;
}
this.outputs[numConnections] = {
id: id,
index: numConnections,
stream: null,
filters: []
};
this.numConnected++;
return true;
}

pipe(id: string, stream: Writable): boolean {
const connection: StreamConnection = this.findConnection(id);
if (!connection) {
return false;
}
if (connection.stream !== null) {
return false;
}
const buffer = this.bufferStreams[connection.index];
assert(buffer instanceof PassThrough);
buffer.pipe(stream);
return true;
}

/**
*
* @param {string} id
* @returns {"stream".internal.Writable}
*/
unpipe(id: string): Writable {
const output = this.findConnection(id);
if (!output || !output.stream) {
return null;
}
const buffer = this.bufferStreams[output.index];
buffer.unpipe(output.stream);
buffer.end();
this.bufferStreams[output.index] = null;
return output.stream;
}

addFilter(id: string, filter: RelayFilter): boolean {
const connection = this.findConnection(id);
if (!connection) {
return false;
}
connection.filters.push(filter);
return true;
}

private findConnection(id: string): StreamConnection {
return this.outputs.find((o: StreamConnection) => o.id === id);
}

private relayMessages(msg: any) {
for (let i = 0; i < this.numConnections; i++) {
const buffer: PassThrough = this.bufferStreams[i];
if (!buffer) {
continue;
}
const connection: StreamConnection = this.outputs[i];
const shouldRelay: boolean = connection.filters.reduce((prev, filter) => prev && filter(msg), true);
if (!shouldRelay) {
continue;
}
// We ignore the backpressure complaints here, because we explicitly design the PassThrough stream to act as
// a buffer
buffer.write(msg);
}
}
}

export type RelayFilter = (msg: any) => boolean;

interface StreamConnection {
id: string;
index: number;
stream: Writable;
filters: RelayFilter[];
}

0 comments on commit 585db0c

Please sign in to comment.