Skip to content

Commit

Permalink
Merge pull request coinbase#201 from blair/user-defined-type-guards
Browse files Browse the repository at this point in the history
User defined type guards
  • Loading branch information
fb55 authored Apr 10, 2018
2 parents a26436f + 65d24e4 commit d7495d7
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 32 deletions.
11 changes: 5 additions & 6 deletions src/core/HFTFilter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import { BaseOrderMessage,
ChangedOrderMessage,
isOrderMessage,
isBaseOrderMessage,
isStreamMessage,
NewOrderMessage,
StreamMessage } from './Messages';
Expand Down Expand Up @@ -84,9 +84,8 @@ export class HFTFilter extends Duplex {
* Add the message to the queue
*/
addMessage(message: StreamMessage): boolean {
if (isOrderMessage(message)) {
const order = message as BaseOrderMessage;
this.messagesById[order.orderId] = order;
if (isBaseOrderMessage(message)) {
this.messagesById[message.orderId] = message;
}
this.messages.insert(message);
return true;
Expand Down Expand Up @@ -143,8 +142,8 @@ export class HFTFilter extends Duplex {
const node = this.messages.min();
if (node) {
assert(this.messages.remove(node));
if (isOrderMessage(node)) {
delete this.messagesById[(node as BaseOrderMessage).orderId];
if (isBaseOrderMessage(node)) {
delete this.messagesById[node.orderId];
}
}
return node;
Expand Down
6 changes: 3 additions & 3 deletions src/core/LiveOrderbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ export class LiveOrderbook extends Duplex implements Orderbook {
// Pass the msg on to downstream users
this.push(msg);
// Process the message for the orderbook state
if (!isStreamMessage(msg) || !msg.productId) {
if (msg.productId !== this.product) {
return callback();
}
if (msg.productId !== this.product) {
if (!isStreamMessage(msg)) {
return callback();
}
switch (msg.type) {
Expand All @@ -184,7 +184,7 @@ export class LiveOrderbook extends Duplex implements Orderbook {
this.emit('LiveOrderbook.trade', msg);
break;
default:
this.processLevel3Messages(msg);
this.processLevel3Messages(msg as OrderbookMessage);
this.emit('LiveOrderbook.update', msg);
break;
}
Expand Down
20 changes: 12 additions & 8 deletions src/core/MessageQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
***************************************************************************************************************************/

import { Logger, ConsoleLoggerFactory } from '../utils/Logger';
import { OrderbookMessage, BaseOrderMessage } from './Messages';
import { isSequencedMessage,
isSnapshotMessage,
BaseOrderMessage,
OrderbookMessage,
SequencedMessage } from './Messages';
import { RBTree } from 'bintrees';
import assert = require('assert');
import { Duplex } from 'stream';
Expand Down Expand Up @@ -70,7 +74,7 @@ export interface MessageQueueConfig {
*/
export class MessageQueue extends Duplex {
private logger: Logger;
private readonly messages: RBTree<OrderbookMessage>;
private readonly messages: RBTree<SequencedMessage>;
private targetQueueLength: number;
private lastSequence: number;
private productId: string;
Expand Down Expand Up @@ -109,7 +113,7 @@ export class MessageQueue extends Duplex {
*/
end() {
// Clear the queue first
let message: OrderbookMessage;
let message: SequencedMessage;
// tslint:disable-next-line:no-conditional-assignment
while (message = this.pop()) {
this.push(message);
Expand All @@ -121,8 +125,8 @@ export class MessageQueue extends Duplex {
* Add the message to the queue
* @param message
*/
addMessage(message: OrderbookMessage): void {
if (message.type === 'snapshot' && this.waitForSnapshot) {
addMessage(message: SequencedMessage): void {
if (isSnapshotMessage(message) && this.waitForSnapshot) {
this.lastSequence = message.sequence - 1;
} else {
this.messages.insert(message);
Expand Down Expand Up @@ -178,7 +182,7 @@ export class MessageQueue extends Duplex {
const node = this.messages.min();
if (node) {
// If we haven't emitted any messages yet, and we're waiting for a snapshot, it must be the first message
if (node.sequence && expectedSequence < 0 && this.waitForSnapshot && node.type !== 'snapshot') {
if (node.sequence && expectedSequence < 0 && this.waitForSnapshot && !isSnapshotMessage(node)) {
return null;
}
// If we've received a snapshot, old messages can be discarded
Expand Down Expand Up @@ -211,8 +215,8 @@ export class MessageQueue extends Duplex {
if (msg.productId !== this.productId) {
return false;
}
if (msg.sequence) {
this.addMessage(msg as OrderbookMessage);
if (isSequencedMessage(msg)) {
this.addMessage(msg);
}
return true;
}
Expand Down
37 changes: 25 additions & 12 deletions src/core/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface StreamMessage {
origin?: any;
}

export function isStreamMessage(msg: any): boolean {
export function isStreamMessage(msg: any): msg is StreamMessage {
return !!msg.type;
}

Expand All @@ -45,8 +45,8 @@ export interface HTTPErrorMessage extends ErrorMessage {
};
}

export function isErrorMessage(msg: any): boolean {
return isStreamMessage(msg) && !!msg.message && typeof msg.message === 'string';
export function isErrorMessage(msg: any): msg is ErrorMessage {
return msg.type === 'error';
}

/**
Expand All @@ -62,22 +62,30 @@ export interface UnknownMessage extends StreamMessage {
extra?: any;
}

export function isUnknownMessage(msg: any): boolean {
return isStreamMessage(msg) && !!msg.message && typeof msg.message !== 'string';
export function isUnknownMessage(msg: any): msg is UnknownMessage {
return msg.type === 'unknown';
}

export interface SequencedMessage {
sequence: number;
sourceSequence?: number;
}

export function isSequencedMessage(msg: any): msg is SequencedMessage {
return typeof msg.sequence === 'number';
}

/**
* Root definition for messages that stem from a websocket feed
*/
export interface OrderbookMessage extends StreamMessage {
sequence: number;
sourceSequence?: number;
export interface OrderbookMessage extends SequencedMessage, StreamMessage {
type: 'newOrder' | 'orderDone' | 'changedOrder' | 'level';
productId: string;
side: string;
}

export function isOrderbookMessage(msg: any): boolean {
return msg.sequence && msg.productId && msg.side;
export function isOrderbookMessage(msg: any): msg is OrderbookMessage {
return isStreamMessage(msg) && isSequencedMessage(msg) && !!(msg as OrderbookMessage).productId && !!(msg as OrderbookMessage).side;
}

// ---------------------------------------- Order-level (Level 3) Messages --------------------------------------------//
Expand All @@ -86,12 +94,13 @@ export function isOrderbookMessage(msg: any): boolean {
* Message representing the common state for a resting order (for an order request, see PlaceOrderRequest)
*/
export interface BaseOrderMessage extends OrderbookMessage {
type: 'newOrder' | 'orderDone' | 'changedOrder';
orderId: string;
price: string;
}

export function isOrderMessage(msg: any): boolean {
return msg.orderId && msg.side && msg.price;
export function isBaseOrderMessage(msg: any): msg is BaseOrderMessage {
return msg.orderId && msg.price && isOrderbookMessage(msg);
}

/**
Expand Down Expand Up @@ -155,6 +164,10 @@ export interface SnapshotMessage extends StreamMessage, OrderbookState {
productId: string;
}

export function isSnapshotMessage(msg: any): msg is SnapshotMessage {
return msg.type === 'snapshot';
}

export interface TickerMessage extends StreamMessage, Ticker {
type: 'ticker';
productId: string;
Expand Down
5 changes: 2 additions & 3 deletions src/lib/Orderbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
***************************************************************************************************************************/

import { RBTree } from 'bintrees';
import { SequencedMessage } from '../core/Messages';
import { OrderPool } from './BookBuilder';
import { Big, BigJS } from './types';

Expand Down Expand Up @@ -85,9 +86,7 @@ export interface CumulativePriceLevel extends PriceLevelWithOrders {
cumValue: BigJS;
}

export interface OrderbookState {
sequence: number;
sourceSequence?: number;
export interface OrderbookState extends SequencedMessage {
asks: PriceLevelWithOrders[];
bids: PriceLevelWithOrders[];
orderPool: OrderPool;
Expand Down

0 comments on commit d7495d7

Please sign in to comment.