Skip to content

Commit

Permalink
Merge branch 'master' into delta
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Nov 26, 2021
2 parents c86153d + d8da432 commit 34e8feb
Show file tree
Hide file tree
Showing 41 changed files with 111 additions and 154 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* Feature: Add support for sandbox/testnet on BinanceFutures.
* Feature: New exchange - Crypto.com.
* Bugfix: Fix MongoDB backend.
* Update: reduce code duplication for candle interval normalization.
* Update: Simplify code around address specification and selection when using sandbox/testnet.

### 2.1.0 (2021-11-14)
* Bugfix: Update binance user data streams to use cdef types.
Expand Down
3 changes: 3 additions & 0 deletions cryptofeed/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
class Exchange:
id = NotImplemented
symbol_endpoint = NotImplemented
websocket_endpoint = NotImplemented
sandbox_endpoint = NotImplemented
_parse_symbol_data = NotImplemented
websocket_channels = NotImplemented
request_limit = NotImplemented
valid_candle_intervals = NotImplemented
candle_interval_map = NotImplemented
http_sync = HTTPSync()

def __init__(self, config=None, sandbox=False, subaccount=None, **kwargs):
Expand Down
5 changes: 1 addition & 4 deletions cryptofeed/exchanges/ascendex.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
class AscendEX(Feed):
id = ASCENDEX
symbol_endpoint = 'https://ascendex.com/api/pro/v1/products'
websocket_endpoint = 'wss://ascendex.com/1/api/pro/v1/stream'
websocket_channels = {
L2_BOOK: 'depth:',
TRADES: 'trades:',
Expand All @@ -49,10 +50,6 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:

return ret, info

def __init__(self, **kwargs):
super().__init__('wss://ascendex.com/1/api/pro/v1/stream', **kwargs)
self.__reset()

def __reset(self):
self._l2_book = {}
self.seq_no = defaultdict(lambda: None)
Expand Down
22 changes: 8 additions & 14 deletions cryptofeed/exchanges/bequant.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
class Bequant(Feed):
id = BEQUANT
symbol_endpoint = 'https://api.bequant.io/api/2/public/symbol'
websocket_endpoint = {
'market': 'wss://api.bequant.io/api/2/ws/public',
'trading': 'wss://api.bequant.io/api/2/ws/trading',
'account': 'wss://api.bequant.io/api/2/ws/account',
}
valid_candle_intervals = {'1m', '3m', '5m', '15m', '30m', '1h', '4h', '1d', '1w', '1M'}
candle_interval_map = {'1m': 'M1', '3m': 'M3', '5m': 'M5', '15m': 'M15', '30m': 'M30', '1h': 'H1', '4h': 'H4', '1d': 'D1', '1w': 'D7', '1M': '1M'}
websocket_channels = {
BALANCES: 'subscribeBalance',
TRANSACTIONS: 'subscribeTransactions',
Expand Down Expand Up @@ -65,18 +71,6 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:

return ret, info

def __init__(self, **kwargs):
urls = {
'market': 'wss://api.bequant.io/api/2/ws/public',
'trading': 'wss://api.bequant.io/api/2/ws/trading',
'account': 'wss://api.bequant.io/api/2/ws/account',
}
super().__init__(urls, **kwargs)
interval_map = {'1m': 'M1', '3m': 'M3', '5m': 'M5', '15m': 'M15', '30m': 'M30', '1h': 'H1', '4h': 'H4', '1d': 'D1', '1w': 'D7', '1M': '1M'}
self.candle_interval = interval_map[self.candle_interval]
self.normalize_interval = {value: key for key, value in interval_map.items()}
self.__reset()

def __reset(self):
self._l2_book = {}
self.seq_no = {}
Expand Down Expand Up @@ -171,7 +165,7 @@ async def _candles(self, msg: dict, timestamp: float):
}
"""

interval = str(self.normalize_interval[msg['period']])
interval = str(self.normalize_candle_interval[msg['period']])

for candle in msg['data']:
start = self.timestamp_normalize(candle['timestamp'])
Expand Down Expand Up @@ -415,7 +409,7 @@ async def subscribe(self, conn: AsyncConnection):
"symbol": symbol,
}
if chan == "subscribeCandles":
params['period'] = self.candle_interval
params['period'] = self.candle_interval_map[self.candle_interval]
LOG.debug(f'{self.id}: Subscribing to "{chan}"" with params {params}')
await conn.write(json.dumps(
{
Expand Down
10 changes: 6 additions & 4 deletions cryptofeed/exchanges/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
class Binance(Feed, BinanceRestMixin):
id = BINANCE
symbol_endpoint = 'https://api.binance.com/api/v3/exchangeInfo'
websocket_endpoint = 'wss://stream.binance.com:9443'
listen_key_endpoint = 'userDataStream'
valid_depths = [5, 10, 20, 50, 100, 500, 1000, 5000]
# m -> minutes; h -> hours; d -> days; w -> weeks; M -> months
Expand Down Expand Up @@ -83,8 +84,7 @@ def __init__(self, candle_closed_only=False, depth_interval='100ms', **kwargs):
if depth_interval is not None and depth_interval not in self.valid_depth_intervals:
raise ValueError(f"Depth interval must be one of {self.valid_depth_intervals}")

super().__init__({}, **kwargs)
self.ws_endpoint = 'wss://stream.binance.com:9443'
super().__init__(**kwargs)
self.rest_endpoint = 'https://www.binance.com/api/v3'
self.candle_closed_only = candle_closed_only
self.depth_interval = depth_interval
Expand All @@ -106,9 +106,11 @@ def _address(self) -> Union[str, Dict]:
"""
if self.requires_authentication:
listen_key = self._generate_token()
address = self.ws_endpoint + '/ws/' + listen_key
address = self.websocket_endpoint if not self.sandbox else self.sandbox_endpoint
address += '/ws/' + listen_key
else:
address = self.ws_endpoint + '/stream?streams='
address = self.websocket_endpoint if not self.sandbox else self.sandbox_endpoint
address += '/stream?streams='
subs = []

is_any_private = any(self.is_authenticated_channel(chan) for chan in self.subscription)
Expand Down
2 changes: 1 addition & 1 deletion cryptofeed/exchanges/binance_delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class BinanceDelivery(Binance, BinanceDeliveryRestMixin):
id = BINANCE_DELIVERY
symbol_endpoint = 'https://dapi.binance.com/dapi/v1/exchangeInfo'
websocket_endpoint = 'wss://dstream.binance.com'
listen_key_endpoint = 'listenKey'
valid_depths = [5, 10, 20, 50, 100, 500, 1000]
valid_depth_intervals = {'100ms', '250ms', '500ms'}
Expand All @@ -35,7 +36,6 @@ class BinanceDelivery(Binance, BinanceDeliveryRestMixin):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# overwrite values previously set by the super class Binance
self.ws_endpoint = 'wss://dstream.binance.com'
self.rest_endpoint = 'https://dapi.binance.com/dapi/v1'
self.address = self._address()
self.ws_defaults['compression'] = None
Expand Down
3 changes: 2 additions & 1 deletion cryptofeed/exchanges/binance_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
class BinanceFutures(Binance, BinanceFuturesRestMixin):
id = BINANCE_FUTURES
symbol_endpoint = 'https://fapi.binance.com/fapi/v1/exchangeInfo'
websocket_endpoint = 'wss://fstream.binance.com'
sandbox_endpoint = "wss://stream.binancefuture.com"
listen_key_endpoint = 'listenKey'
valid_depths = [5, 10, 20, 50, 100, 500, 1000]
valid_depth_intervals = {'100ms', '250ms', '500ms'}
Expand Down Expand Up @@ -51,7 +53,6 @@ def __init__(self, open_interest_interval=1.0, **kwargs):
"""
super().__init__(**kwargs)
# overwrite values previously set by the super class Binance
self.ws_endpoint = 'wss://fstream.binance.com' if not self.sandbox else "wss://stream.binancefuture.com"
self.rest_endpoint = 'https://fapi.binance.com/fapi/v1' if not self.sandbox else "https://testnet.binancefuture.com/fapi/v1"
self.address = self._address()
self.ws_defaults['compression'] = None
Expand Down
2 changes: 1 addition & 1 deletion cryptofeed/exchanges/binance_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
class BinanceUS(Binance, BinanceUSRestMixin):
id = BINANCE_US
symbol_endpoint = 'https://api.binance.us/api/v3/exchangeInfo'
websocket_endpoint = 'wss://stream.binance.us:9443'

def __init__(self, **kwargs):
super().__init__(**kwargs)
# overwrite values previously set by the super class Binance
self.ws_endpoint = 'wss://stream.binance.us:9443'
self.rest_endpoint = 'https://api.binance.us/api/v3'
self.address = self._address()
18 changes: 7 additions & 11 deletions cryptofeed/exchanges/bitfinex.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Bitfinex(Feed, BitfinexRestMixin):
'https://api-pub.bitfinex.com/v2/conf/pub:list:currency',
'https://api-pub.bitfinex.com/v2/conf/pub:list:pair:futures',
]
websocket_endpoint = 'wss://api.bitfinex.com/ws/2'
websocket_channels = {
L3_BOOK: 'book-R0-{}-{}',
L2_BOOK: 'book-P0-{}-{}',
Expand Down Expand Up @@ -100,18 +101,13 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]:

return ret, info

def __init__(self, symbols=None, channels=None, subscription=None, number_of_price_points: int = 100,
book_frequency: str = 'F0', **kwargs):
if number_of_price_points not in (1, 25, 100, 250):
raise ValueError("number_of_price_points should be in 1, 25, 100, 250")
if book_frequency not in ('F0', 'F1'):
raise ValueError("book_frequency should be in F0, F1")

if symbols is not None and channels is not None:
super().__init__('wss://api.bitfinex.com/ws/2', symbols=symbols, channels=channels, **kwargs)
else:
super().__init__('wss://api.bitfinex.com/ws/2', subscription=subscription, **kwargs)
def __init__(self, symbols=None, channels=None, subscription=None, number_of_price_points: int = 100, book_frequency: str = 'F0', **kwargs):
if number_of_price_points not in {1, 25, 100, 250}:
raise ValueError("number_of_price_points should be one of 1, 25, 100, 250")
if book_frequency not in {'F0', 'F1'}:
raise ValueError("book_frequency should be one of F0, F1")

super().__init__(symbols=symbols, channels=channels, subscription=subscription, **kwargs)
self.number_of_price_points = number_of_price_points
self.book_frequency = book_frequency
if channels or subscription:
Expand Down
4 changes: 1 addition & 3 deletions cryptofeed/exchanges/bitflyer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class Bitflyer(Feed):
id = BITFLYER
symbol_endpoint = endpoints = ['https://api.bitflyer.com/v1/getmarkets/eu', 'https://api.bitflyer.com/v1/getmarkets/usa', 'https://api.bitflyer.com/v1/getmarkets']
websocket_endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
websocket_channels = {
L2_BOOK: 'lightning_board_{}',
TRADES: 'lightning_executions_{}',
Expand Down Expand Up @@ -52,9 +53,6 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]:
info['instrument_type'][s.normalized] = stype
return ret, info

def __init__(self, **kwargs):
super().__init__('wss://ws.lightstream.bitflyer.com/json-rpc', **kwargs)

def __reset(self):
self._l2_book = {}

Expand Down
3 changes: 2 additions & 1 deletion cryptofeed/exchanges/bithumb.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Bithumb(Feed):
('https://api.bithumb.com/public/ticker/ALL_BTC', 'BTC'),
('https://api.bithumb.com/public/ticker/ALL_KRW', 'KRW')
]
websocket_endpoint = "wss://pubwss.bithumb.com/pub/ws"
websocket_channels = {
# L2_BOOK: 'orderbookdepth', <-- technically the exchange supports orderbooks but it only provides orderbook deltas, there is
# no way to synchronize against a rest snapshot, nor request/obtain an orderbook via the websocket, so this isn't really useful
Expand Down Expand Up @@ -87,7 +88,7 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, max_depth=30, **kwargs):
super().__init__("wss://pubwss.bithumb.com/pub/ws", max_depth=max_depth, **kwargs)
super().__init__(max_depth=max_depth, **kwargs)

async def _trades(self, msg: dict, rtimestamp: float):
'''
Expand Down
7 changes: 4 additions & 3 deletions cryptofeed/exchanges/bitmex.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
class Bitmex(Feed, BitmexRestMixin):
id = BITMEX
symbol_endpoint = "https://www.bitmex.com/api/v1/instrument/active"
websocket_endpoint = 'wss://www.bitmex.com/realtime'
sandbox_endpoint = 'wss://testnet.bitmex.com/realtime'
websocket_channels = {
L2_BOOK: 'orderBookL2',
TRADES: 'trade',
Expand Down Expand Up @@ -58,9 +60,8 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:

return ret, info

def __init__(self, sandbox=False, **kwargs):
auth_api = 'wss://www.bitmex.com/realtime' if not sandbox else 'wss://testnet.bitmex.com/realtime'
super().__init__(auth_api, **kwargs)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ws_defaults['compression'] = None
self._reset()

Expand Down
3 changes: 2 additions & 1 deletion cryptofeed/exchanges/bitstamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Bitstamp(Feed, BitstampRestMixin):
id = BITSTAMP
symbol_endpoint = "https://www.bitstamp.net/api/v2/trading-pairs-info/"
# API documentation: https://www.bitstamp.net/websocket/v2/
websocket_endpoint = 'wss://ws.bitstamp.net/'
websocket_channels = {
L3_BOOK: 'detail_order_book',
L2_BOOK: 'diff_order_book',
Expand Down Expand Up @@ -55,7 +56,7 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, **kwargs):
super().__init__('wss://ws.bitstamp.net/', **kwargs)
super().__init__(**kwargs)
self.ws_defaults['compression'] = None

async def _process_l2_book(self, msg: dict, timestamp: float):
Expand Down
2 changes: 1 addition & 1 deletion cryptofeed/exchanges/bittrex.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, **kwargs):
super().__init__('wss://socket-v3.bittrex.com/signalr/connect', **kwargs)
super().__init__(**kwargs)
r = requests.get('https://socket-v3.bittrex.com/signalr/negotiate', params={'connectionData': json.dumps([{'name': 'c3'}]), 'clientProtocol': 1.5})
token = r.json()['ConnectionToken']
url = requests.Request('GET', 'https://socket-v3.bittrex.com/signalr/connect', params={'transport': 'webSockets', 'connectionToken': token, 'connectionData': json.dumps([{"name": "c3"}]), 'clientProtocol': 1.5}).prepare().url
Expand Down
3 changes: 2 additions & 1 deletion cryptofeed/exchanges/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class Blockchain(Feed):
id = BLOCKCHAIN
symbol_endpoint = "https://api.blockchain.com/mercury-gateway/v1/instruments"
websocket_endpoint = "wss://ws.prod.blockchain.info/mercury-gateway/v1/ws"
websocket_channels = {
L3_BOOK: 'l3',
L2_BOOK: 'l2',
Expand All @@ -44,7 +45,7 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, **kwargs):
super().__init__("wss://ws.prod.blockchain.info/mercury-gateway/v1/ws", origin="https://exchange.blockchain.com", **kwargs)
super().__init__(origin="https://exchange.blockchain.com", **kwargs)
self.__reset()

def __reset(self):
Expand Down
11 changes: 8 additions & 3 deletions cryptofeed/exchanges/bybit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
class Bybit(Feed):
id = BYBIT
symbol_endpoint = 'https://api.bybit.com/v2/public/symbols'
websocket_endpoint = {
'USD': 'wss://stream.bybit.com/realtime',
'USDT': 'wss://stream.bybit.com/realtime_public',
'USDTP': 'wss://stream.bybit.com/realtime_private'
}
websocket_channels = {
L2_BOOK: 'orderBook_200.100ms',
TRADES: 'trade',
Expand All @@ -41,6 +46,7 @@ class Bybit(Feed):
# BALANCES: 'position' removing temporarily, this is a position, not a balance
}
valid_candle_intervals = {'1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '1d', '1w', '1M'}
candle_interval_map = {'1m': '1', '3m': '3', '5m': '5', '15m': '15', '30m': '30', '1h': '60', '2h': '120', '4h': '240', '6h': '360', '1d': 'D', '1w': 'W', '1M': 'M'}

@classmethod
def timestamp_normalize(cls, ts: Union[int, dt]) -> float:
Expand Down Expand Up @@ -75,9 +81,8 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, **kwargs):
super().__init__({'USD': 'wss://stream.bybit.com/realtime', 'USDT': 'wss://stream.bybit.com/realtime_public', 'USDTP': 'wss://stream.bybit.com/realtime_private'}, **kwargs)
super().__init__(**kwargs)
self.ws_defaults['compression'] = None
self.candle_mapping = {'1m': '1', '3m': '3', '5m': '5', '15m': '15', '30m': '30', '1h': '60', '2h': '120', '4h': '240', '6h': '360', '1d': 'D', '1w': 'W', '1M': 'M'}

def __reset(self, quote=None):
self._instrument_info_cache = {}
Expand Down Expand Up @@ -233,7 +238,7 @@ async def subscribe(self, connection: AsyncConnection, quote: str = None):
await connection.write(json.dumps(
{
"op": "subscribe",
"args": [f"{chan}.{pair}"] if self.exchange_channel_to_std(chan) != CANDLES else [f"{chan if quote == 'USD' else 'candle'}.{self.candle_mapping[self.candle_interval]}.{pair}"]
"args": [f"{chan}.{pair}"] if self.exchange_channel_to_std(chan) != CANDLES else [f"{chan if quote == 'USD' else 'candle'}.{self.candle_interval_map[self.candle_interval]}.{pair}"]
}
))
LOG.debug(f'{connection.uuid}: Subscribing to public, quote: {quote}, {chan}.{pair}')
Expand Down
3 changes: 2 additions & 1 deletion cryptofeed/exchanges/coinbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
class Coinbase(Feed, CoinbaseRestMixin):
id = COINBASE
symbol_endpoint = 'https://api.pro.coinbase.com/products'
websocket_endpoint = 'wss://ws-feed.pro.coinbase.com'
websocket_channels = {
L2_BOOK: 'level2',
L3_BOOK: 'full',
Expand All @@ -48,7 +49,7 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
return ret, info

def __init__(self, callbacks=None, **kwargs):
super().__init__('wss://ws-feed.pro.coinbase.com', callbacks=callbacks, **kwargs)
super().__init__(callbacks=callbacks, **kwargs)
self.ws_defaults['compression'] = None
# we only keep track of the L3 order book if we have at least one subscribed order-book callback.
# use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below),
Expand Down
5 changes: 1 addition & 4 deletions cryptofeed/exchanges/cryptodotcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
class CryptoDotCom(Feed):
id = CRYPTODOTCOM
symbol_endpoint = 'https://api.crypto.com/v2/public/get-instruments'
websocket_endpoint = 'wss://stream.crypto.com/v2/market'
websocket_channels = {
L2_BOOK: 'book',
TRADES: 'trade',
Expand All @@ -50,10 +51,6 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
ret[sym.normalized] = entry['instrument_name']
return ret, info

def __init__(self, **kwargs):
super().__init__('wss://stream.crypto.com/v2/market', **kwargs)
self.__reset()

def __reset(self):
self._l2_book = {}

Expand Down
5 changes: 1 addition & 4 deletions cryptofeed/exchanges/deribit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
class Deribit(Feed, DeribitRestMixin):
id = DERIBIT
symbol_endpoint = ['https://www.deribit.com/api/v2/public/get_instruments?currency=BTC', 'https://www.deribit.com/api/v2/public/get_instruments?currency=ETH']
websocket_endpoint = 'wss://www.deribit.com/ws/api/v2'
websocket_channels = {
L1_BOOK: 'quote',
L2_BOOK: 'book',
Expand Down Expand Up @@ -68,10 +69,6 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]:
info['instrument_type'][s.normalized] = CURRENCY
return ret, info

def __init__(self, **kwargs):
super().__init__('wss://www.deribit.com/ws/api/v2', **kwargs)
self.__reset()

def __reset(self):
self._open_interest_cache = {}
self._l2_book = {}
Expand Down
Loading

0 comments on commit 34e8feb

Please sign in to comment.