Skip to content

Commit

Permalink
Discriminated unions (coinbase#203)
Browse files Browse the repository at this point in the history
* GDAXFeed#mapMessage: do not set the 'extra' property.

The 'origin' property is already set by handleMessage() and
UnknownMessage's documentation says that

  Any context-rich information can be extracted into the `extra`
  field, and the original message should be attached to the `origin`
  field as usual.

* GDAXMessage: switch to discriminated unions types.

Add a new GDAXMessage that is a union type and rename the old
GDAXMessage to OptionalUserId, dropping the 'GDAX' and the 'Message'
because the 'type property was removed since all sub-interfaces define
it exactly and the since the OptionalUserId super-interface isn't
meant to be publically used now.

Add GDAXHeartbeatMessage and GDAXReceivedMessage which is required to
avoid compiler errors with the current code.

* GDAXMatchMessage: add maker_user_id and taker_user_id properties.

* Move error handling out GDAXFeed#mapMessage.

This is so the GDAXErrorMessage type is consistently handled between
mapMessage and mapAuthMessage, useful for discriminated union.

* GDAXFeed#mapFullFeed: simplify.

* GDAXFeed#mapUnknown: new method to consistently handle messages.

* GDAXFeed#handleMessage: use isGDAXMessage.

Add attachSequence() to attach the incoming sequence number to an
StreamMessage that the Writables will receive.

* GDAXFeed: remove many type assertions.

http://www.typescriptlang.org/docs/handbook/advanced-types.html

This is made possible by TypeScript's discriminated unions. For
example, handleMessage() calls isGDAXMessage(feedMessage) which means
that tsc knows that feedMessage is a GDAXMessage after the if
block. So switching on its type property means that the compiler knows
exactly the sub-interface in the case clause. So no downcasts needd.

Now that GDAXMessage is a union type, in a switch statement, the
TypeScript compiler knows the GDAXMessage specific type so no type
assertions are needed.

* GDAXFeed#mapMessage: remove duplicate variable.

* GDAXFeed#map*: tighten the input argument type.

Do this for mapFullFeed(), mapMessage() and mapAuthMessage().

* GDAXFeed#mapMessage: statically assert that all types are handled.

If one were to comment out a case statement in mapMessage() then tsc
would generate a compile error. See

https://www.typescriptlang.org/docs/handbook/advanced-types.html

* GDAXFeed#mapAuthMessage: explicitly handle all message types.

This is to generate a compile error if any new message types show up
in the future, tsc will tell the developer that the new type needs to
be handled.

* GDAXFeed#mapAuthMessage: remove a type assertion to any.

This is possible because of the stronger types, no need to check for a
property if the compiler tells you it exists.

* GDAXFeed#mapTicker: do not type assert the return object.

See https://palantir.github.io/tslint/rules/no-object-literal-type-assertion/ .

This showed that TickerMessage was missing a sequence property so add
it, however, make it optional since it appears only the GDAX exchange
has sequence numbers in ticker messages.

* GDAXFeed#mapUnknown: set the origin property.

This adds the origin property to the returned object. This fixes a
bug where handleMessage() would receive a non-GDAXMessage and the
UnknownMessage instance it pushes wouldn't have the origin property
set.
  • Loading branch information
blair authored and fb55 committed Apr 14, 2018
1 parent e6c3e6e commit 32d5e8e
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 114 deletions.
1 change: 1 addition & 0 deletions src/core/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export function isSnapshotMessage(msg: any): msg is SnapshotMessage {

export interface TickerMessage extends StreamMessage, Ticker {
type: 'ticker';
sequence?: number;
productId: string;
}

Expand Down
227 changes: 125 additions & 102 deletions src/exchanges/gdax/GDAXFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import {
GDAXErrorMessage,
GDAXL2UpdateMessage,
GDAXMatchMessage,
GDAXMessage,
GDAXOpenMessage,
GDAXReceivedMessage,
GDAXSnapshotMessage,
GDAXSubscriptionsMessage,
GDAXTickerMessage
GDAXTickerMessage,
isGDAXMessage
} from './GDAXMessages';
import { AuthenticatedExchangeAPI } from '../AuthenticatedExchangeAPI';
import { staticAssertNever } from '../../lib/asserts';
import { Side, SIDES } from '../../lib/sides';
import { Level3Order, PriceLevelWithOrders } from '../../lib/Orderbook';
import { ExchangeFeed, ExchangeFeedConfig } from '../ExchangeFeed';
Expand All @@ -52,6 +54,13 @@ import { AuthHeaders, GDAXAuthConfig } from './GDAXInterfaces';

export const GDAX_WS_FEED = 'wss://ws-feed.gdax.com';

type GDAXTradingMessage =
GDAXReceivedMessage |
GDAXOpenMessage |
GDAXDoneMessage |
GDAXMatchMessage |
GDAXChangeMessage;

/**
* Configuration interface for a GDAX websocket feed stream. `wsUrl` is used to override the default websocket URL.
* Usually, you don't need this, but you may want to obtain a feed from the sandbox for testing, or an historical
Expand Down Expand Up @@ -162,31 +171,39 @@ export class GDAXFeed extends ExchangeFeed {
*/
protected handleMessage(msg: string): void {
try {
const feedMessage: GDAXMessage = JSON.parse(msg);
const feedMessage = JSON.parse(msg);
if (!isGDAXMessage(feedMessage)) {
const m = this.mapUnknown(feedMessage);
this.attachSequence(m, feedMessage);
this.push(m);
return;
}

let message: StreamMessage;
switch (feedMessage.type) {
case 'subscriptions':
this.setProducts(feedMessage as any);
this.setProducts(feedMessage);
return;
case 'heartbeat':
this.confirmAlive();
return;
case 'ticker':
message = this.mapTicker(feedMessage as GDAXTickerMessage);
message = this.mapTicker(feedMessage);
break;
case 'l2update':
this.processUpdate(feedMessage as GDAXL2UpdateMessage);
this.processUpdate(feedMessage);
return;
case 'snapshot':
this.processSnapshot(this.createSnapshotMessage(feedMessage as GDAXSnapshotMessage));
this.processSnapshot(this.createSnapshotMessage(feedMessage));
return;
case 'error':
message = this.mapError(feedMessage);
break;
default:
message = this.mapFullFeed(feedMessage);
}
if (message) {
if ((feedMessage as any).sequence) {
(message as any).sourceSequence = (feedMessage as any).sequence;
}
this.attachSequence(message, feedMessage);
message.origin = feedMessage;
this.push(message);
}
Expand Down Expand Up @@ -290,8 +307,40 @@ export class GDAXFeed extends ExchangeFeed {
});
}

private mapTicker(ticker: GDAXTickerMessage): StreamMessage {
return {
private attachSequence(streamMessage: StreamMessage,
feedMessage: any): void {
if ((feedMessage as any).sequence) {
(streamMessage as any).sourceSequence = (feedMessage as any).sequence;
}
}

private mapError(errorMessage: GDAXErrorMessage): ErrorMessage {
const msg: ErrorMessage = {
type: 'error',
time: new Date(),
message: errorMessage.message,
cause: errorMessage.reason
};
this.emit('feed-error', msg);
return msg;
}

private mapUnknown(unknown: any): UnknownMessage {
const time = unknown.time ? new Date(unknown.time) : new Date();
const product = unknown.product_id;
const sequence = unknown.sequence || (product && this.getSequence(product));
const msg: UnknownMessage = {
type: 'unknown',
time: time,
sequence: sequence,
productId: product,
origin: unknown
};
return msg;
}

private mapTicker(ticker: GDAXTickerMessage): TickerMessage {
const msg: TickerMessage = {
type: 'ticker',
time: ticker.time ? new Date(ticker.time) : new Date(),
productId: ticker.product_id,
Expand All @@ -302,15 +351,16 @@ export class GDAXFeed extends ExchangeFeed {
trade_id: String(ticker.trade_id),
size: Big(ticker.last_size),
volume: Big(ticker.volume_24h)
} as TickerMessage;
};
return msg;
}

private mapFullFeed(feedMessage: GDAXMessage): StreamMessage {
private mapFullFeed(feedMessage: GDAXTradingMessage): StreamMessage {
if (feedMessage.user_id) {
return this.mapAuthMessage(feedMessage);
} else {
return this.mapMessage(feedMessage);
}
const message: StreamMessage = this.mapMessage(feedMessage);
return message;
}

private processSnapshot(snapshot: SnapshotMessage) {
Expand All @@ -322,18 +372,18 @@ export class GDAXFeed extends ExchangeFeed {
* Converts GDAX messages into standardised GTT messages. Unknown messages are passed on as_is
* @param feedMessage
*/
private mapMessage(feedMessage: GDAXMessage): StreamMessage {
private mapMessage(feedMessage: GDAXTradingMessage): StreamMessage {
switch (feedMessage.type) {
case 'open': {
const msg: NewOrderMessage = {
type: 'newOrder',
time: new Date((feedMessage as GDAXOpenMessage).time),
sequence: (feedMessage as GDAXOpenMessage).sequence,
productId: (feedMessage as GDAXOpenMessage).product_id,
orderId: (feedMessage as GDAXOpenMessage).order_id,
side: (feedMessage as GDAXOpenMessage).side,
price: (feedMessage as GDAXOpenMessage).price,
size: (feedMessage as GDAXOpenMessage).remaining_size
time: new Date(feedMessage.time),
sequence: feedMessage.sequence,
productId: feedMessage.product_id,
orderId: feedMessage.order_id,
side: feedMessage.side,
price: feedMessage.price,
size: feedMessage.remaining_size
};
return msg;
}
Expand All @@ -343,71 +393,45 @@ export class GDAXFeed extends ExchangeFeed {
// or rounding, but the accounting is nevertheless correct. So if reason is 'filled' we can set the size
// to zero before removing the order. Otherwise if cancelled, remaining_size refers to the size
// that was on the order book
const size = (feedMessage as GDAXDoneMessage).reason === 'filled' ? '0' : (feedMessage as GDAXDoneMessage).remaining_size;
const size = feedMessage.reason === 'filled' ? '0' : feedMessage.remaining_size;
const msg: OrderDoneMessage = {
type: 'orderDone',
time: new Date((feedMessage as GDAXDoneMessage).time),
sequence: (feedMessage as GDAXDoneMessage).sequence,
productId: (feedMessage as GDAXDoneMessage).product_id,
orderId: (feedMessage as GDAXDoneMessage).order_id,
time: new Date(feedMessage.time),
sequence: feedMessage.sequence,
productId: feedMessage.product_id,
orderId: feedMessage.order_id,
remainingSize: size,
price: (feedMessage as GDAXDoneMessage).price,
side: (feedMessage as GDAXDoneMessage).side,
reason: (feedMessage as GDAXDoneMessage).reason
price: feedMessage.price,
side: feedMessage.side,
reason: feedMessage.reason
};
return msg;
}
case 'match': {
return this.mapMatchMessage(feedMessage as GDAXMatchMessage);
return this.mapMatchMessage(feedMessage);
}
case 'change': {
const change: GDAXChangeMessage = feedMessage as GDAXChangeMessage;
if (change.new_funds && !change.new_size) {
change.new_size = (Big(change.new_funds).div(change.price).toString());
if (feedMessage.new_funds && !feedMessage.new_size) {
feedMessage.new_size = (Big(feedMessage.new_funds).div(feedMessage.price).toString());
}
const msg: ChangedOrderMessage = {
type: 'changedOrder',
time: new Date(change.time),
sequence: change.sequence,
productId: change.product_id,
orderId: change.order_id,
side: change.side,
price: change.price,
newSize: change.new_size
time: new Date(feedMessage.time),
sequence: feedMessage.sequence,
productId: feedMessage.product_id,
orderId: feedMessage.order_id,
side: feedMessage.side,
price: feedMessage.price,
newSize: feedMessage.new_size
};
return msg;
}
case 'error': {
const error: GDAXErrorMessage = feedMessage as GDAXErrorMessage;
const msg: ErrorMessage = {
type: 'error',
time: new Date(),
message: error.message,
cause: error.reason
};
this.emit('feed-error', msg);
return msg;
}
case 'received': {
const msg: UnknownMessage = {
type: 'unknown',
time: new Date(),
sequence: (feedMessage as any).sequence,
productId: (feedMessage as any).product_id,
extra: feedMessage
};
return msg;
return this.mapUnknown(feedMessage);
}
default: {
const product: string = (feedMessage as any).product_id;
const msg: UnknownMessage = {
type: 'unknown',
time: new Date(),
sequence: this.getSequence(product),
productId: product,
extra: feedMessage
};
return msg;
staticAssertNever(feedMessage);
return this.mapUnknown(feedMessage);
}
}
}
Expand All @@ -430,64 +454,63 @@ export class GDAXFeed extends ExchangeFeed {
* When the user_id field is set, these are authenticated messages only we receive and so deserve special
* consideration
*/
private mapAuthMessage(feedMessage: GDAXMessage): StreamMessage {
const time = (feedMessage as any).time ? new Date((feedMessage as any).time) : new Date();
private mapAuthMessage(feedMessage: GDAXTradingMessage): StreamMessage {
switch (feedMessage.type) {
case 'match': {
const isTaker: boolean = !!(feedMessage as any).taker_user_id;
const isTaker: boolean = !!feedMessage.taker_user_id;
let side: Side;
if (!isTaker) {
side = (feedMessage as GDAXMatchMessage).side;
side = feedMessage.side;
} else {
side = (feedMessage as GDAXMatchMessage).side === 'buy' ? 'sell' : 'buy';
side = feedMessage.side === 'buy' ? 'sell' : 'buy';
}
const msg: TradeExecutedMessage = {
type: 'tradeExecuted',
time: time,
productId: (feedMessage as GDAXMatchMessage).product_id,
orderId: isTaker ? (feedMessage as GDAXMatchMessage).taker_order_id : (feedMessage as GDAXMatchMessage).maker_order_id,
time: new Date(feedMessage.time),
productId: feedMessage.product_id,
orderId: isTaker ? feedMessage.taker_order_id : feedMessage.maker_order_id,
orderType: isTaker ? 'market' : 'limit',
side: side,
price: (feedMessage as GDAXMatchMessage).price,
tradeSize: (feedMessage as GDAXMatchMessage).size,
price: feedMessage.price,
tradeSize: feedMessage.size,
remainingSize: null
};
return msg;
}
case 'done': {
const msg: TradeFinalizedMessage = {
type: 'tradeFinalized',
time: time,
productId: (feedMessage as GDAXDoneMessage).product_id,
orderId: (feedMessage as GDAXDoneMessage).order_id,
reason: (feedMessage as GDAXDoneMessage).reason,
side: (feedMessage as GDAXDoneMessage).side,
price: (feedMessage as GDAXDoneMessage).price,
remainingSize: (feedMessage as GDAXDoneMessage).remaining_size
time: new Date(feedMessage.time),
productId: feedMessage.product_id,
orderId: feedMessage.order_id,
reason: feedMessage.reason,
side: feedMessage.side,
price: feedMessage.price,
remainingSize: feedMessage.remaining_size
};
return msg;
}
case 'open': {
const msg: MyOrderPlacedMessage = {
type: 'myOrderPlaced',
time: time,
productId: (feedMessage as GDAXOpenMessage).product_id,
orderId: (feedMessage as GDAXOpenMessage).order_id,
side: (feedMessage as GDAXOpenMessage).side,
price: (feedMessage as GDAXOpenMessage).price,
orderType: (feedMessage as GDAXOpenMessage).type,
size: (feedMessage as GDAXOpenMessage).remaining_size,
sequence: (feedMessage as GDAXOpenMessage).sequence
time: new Date(feedMessage.time),
productId: feedMessage.product_id,
orderId: feedMessage.order_id,
side: feedMessage.side,
price: feedMessage.price,
orderType: feedMessage.type,
size: feedMessage.remaining_size,
sequence: feedMessage.sequence
};
return msg;
}
case 'change':
case 'received': {
return this.mapUnknown(feedMessage);
}
default: {
const msg: UnknownMessage = {
type: 'unknown',
time: time,
productId: (feedMessage as any).product_id
};
return msg;
staticAssertNever(feedMessage);
return this.mapUnknown(feedMessage);
}
}
}
Expand Down
Loading

0 comments on commit 32d5e8e

Please sign in to comment.