Skip to content

Commit

Permalink
Fixes and improvements to Bittrex WS (coinbase#102)
Browse files Browse the repository at this point in the history
* add dev version of node.bittrex.api

* convert BittrexFeed subscribe to asynchronous function because verity depends on asynchronous functions inside

* add dev version of node.bittrex.api

* + updated to new library
+ typescript module imports not working, switch back to require

* + queryExchangeState actually depends on SubscribeToExchangeDeltas

* get rid of annoying debugging objects

* Some cleanup
  • Loading branch information
CjS77 authored Jan 6, 2018
1 parent 6e6f107 commit 25788b0
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 130 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"crypto": "0.0.3",
"gdax": "https://github.com/coinbase/gdax-node.git#32360ad73517f168054585a1b0fd6ae3d7e12a77",
"limiter": "git://github.com/jhurliman/node-rate-limiter.git#58ce2fda6b5c2bc4ccb81ba3768c5b1bc06c91a5",
"node.bittrex.api": "0.5.1",
"node-bittrex-api": "0.7.7",
"pushbullet": "2.0.0",
"querystring": "0.2.0",
"simple-mock": "0.8.0",
Expand Down
72 changes: 47 additions & 25 deletions src/exchanges/bittrex/BittrexFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
***************************************************************************************************************************/

import { ExchangeFeed, ExchangeFeedConfig } from '../ExchangeFeed';
import * as Bittrex from 'node.bittrex.api';
import { LevelMessage, SnapshotMessage, TickerMessage, TradeMessage } from '../../core/Messages';
import { BittrexAPI } from './BittrexAPI';
import { Big } from '../../lib/types';
import { OrderPool } from '../../lib/BookBuilder';
import { Level3Order, PriceLevelWithOrders } from '../../lib/Orderbook';
const Bittrex = require('node-bittrex-api');

export class BittrexFeed extends ExchangeFeed {
private client: any;
Expand Down Expand Up @@ -46,38 +46,60 @@ export class BittrexFeed extends ExchangeFeed {
return 'Bittrex';
}

subscribe(products: string[]): boolean {
subscribe(products: string[]): Promise<boolean> {
if (!this.connection) {
return false;
return Promise.reject(false);
}
products.forEach((product: string) => {
this.client.call('CoreHub', 'SubscribeToExchangeDeltas', product).done((err: Error, result: boolean) => {
if (err) {
return console.error(err);
}
return Promise.all(products.map((product: string) => {
return new Promise<boolean>((resolve, reject) => {
this.client.call('CoreHub', 'SubscribeToExchangeDeltas', product).done((err: Error, result: boolean) => {
if (err) {
return reject(err);
}

if (result === true) {
this.log('info', `Subscribed to ${product} on ${this.owner}`);
}
});
this.client.call('CoreHub', 'queryExchangeState', product).done((err: Error, data: any) => {
const snapshot: SnapshotMessage = this.processSnapshot(product, data);
this.push(snapshot);
if (!result) {
const msg = `Failed to subscribeExchangeDeltas to ${product} on ${this.owner}`;
this.log('info', msg);
return reject(new Error(msg));
}

this.client.call('CoreHub', 'queryExchangeState', product).done((queryErr: Error, data: any) => {
if (queryErr) {
return reject(queryErr);
}
if (!data) {
const msg = `failed to queryExchangeState to ${product} on ${this.owner}`;
this.log('error', msg);
return reject(new Error(msg));
}
const snapshot: SnapshotMessage = this.processSnapshot(product, data);
this.push(snapshot);
return resolve(true);
});
});
});
})).then(() => {
// Every result is guaranteed to be true
return Promise.resolve(true);
}).catch((err) => {
return Promise.reject(err);
});
return true;
}

protected connect() {
const client = this.client = this.client = Bittrex.websockets.client();
client.serviceHandlers.messageReceived = (msg: any) => this.handleMessage(msg);
client.serviceHandlers.bound = () => this.onNewConnection();
client.serviceHandlers.disconnected = (code: number, reason: string) => this.onClose(code, reason);
client.serviceHandlers.onerror = (err: Error) => this.onError(err);
client.serviceHandlers.connected = (connection: any) => {
this.connection = connection;
this.emit('websocket-connection');
};
Bittrex.websockets.client(
(client: any) => {
this.client = client;
client.serviceHandlers.messageReceived = (msg: any) => this.handleMessage(msg);
client.serviceHandlers.bound = () => this.onNewConnection();
client.serviceHandlers.disconnected = (code: number, reason: string) => this.onClose(code, reason);
client.serviceHandlers.onerror = (err: Error) => this.onError(err);
client.serviceHandlers.connected = (connection: any) => {
this.connection = connection;
this.emit('websocket-connection');
};
}
);
}

protected handleMessage(msg: any): void {
Expand Down
9 changes: 6 additions & 3 deletions src/factories/bittrexFactories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ export function getSubscribedFeeds(options: ExchangeFeedConfig, products: string
return reject(new Error('TIMEOUT. Could not connect to Bittrex Feed server'));
}, 30000);
feed.on('websocket-connection', () => {
feed.subscribe(products);
clearTimeout(timeout);
return resolve(feed);
feed.subscribe(products).then(() => {
clearTimeout(timeout);
return resolve(feed);
}).catch((err: Error) => {
return reject(err);
});
});
});
}
Expand Down
60 changes: 33 additions & 27 deletions src/samples/bittrexWSdemo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,42 @@ const product: string = 'BTC-ETH';
const config: ExchangeFeedConfig = {
logger: logger,
auth: null,
wsUrl: null,
wsUrl: null
};

const feed: BittrexFeed = new BittrexFeed(config);

feed.on('websocket-connection', () => {
feed.subscribe([product]);
});
const book = new LiveOrderbook({ logger: logger, product: product, strictMode: false });
book.on('LiveOrderbook.snapshot', () => {
logger.log('info', 'Snapshot received by LiveOrderbook Demo');
setInterval(() => {
console.log(printOrderbook(book, 10, 4, 4));
}, 5000);
});
book.on('LiveOrderbook.ticker', (ticker: Ticker) => {
console.log(printTicker(ticker));
});
book.on('LiveOrderbook.trade', (trade: TradeMessage) => {
logger.log('info', `${trade.side} ${trade.size} on ${trade.productId} at ${trade.price}`);
feed.subscribe([product]).then(() => {
doSomethingWithFeed();
});
});
book.on('LiveOrderbook.skippedMessage', (details: SkippedMessageEvent) => {
console.log('SKIPPED MESSAGE', details);
console.log('Reconnecting to feed');
feed.reconnect(1000);
});
book.on('end', () => {
console.log('Orderbook closed');
});
book.on('error', (err: Error) => {
console.log('Livebook errored: ', err);

function doSomethingWithFeed() {
const book = new LiveOrderbook({ logger: logger, product: product, strictMode: false });
book.on('LiveOrderbook.snapshot', () => {
logger.log('info', 'Snapshot received by LiveOrderbook Demo');
setInterval(() => {
console.log(printOrderbook(book, 10, 4, 4));
}, 5000);
});
book.on('LiveOrderbook.ticker', (ticker: Ticker) => {
console.log(printTicker(ticker));
});
book.on('LiveOrderbook.trade', (trade: TradeMessage) => {
logger.log('info', `${trade.side} ${trade.size} on ${trade.productId} at ${trade.price}`);
});
book.on('LiveOrderbook.skippedMessage', (details: SkippedMessageEvent) => {
console.log('SKIPPED MESSAGE', details);
console.log('Reconnecting to feed');
feed.reconnect(1000);
});
book.on('end', () => {
console.log('Orderbook closed');
});
book.on('error', (err: Error) => {
console.log('Livebook errored: ', err);
feed.pipe(book);
});
feed.pipe(book);
});
feed.pipe(book);
}
Loading

0 comments on commit 25788b0

Please sign in to comment.