Skip to content

Commit

Permalink
Merge pull request freqtrade#1101 from mishaker/ccxt-async
Browse files Browse the repository at this point in the history
use ccxt async for ticker_history download
  • Loading branch information
xmatthias authored Sep 9, 2018
2 parents 062eca1 + 13ffd88 commit 179bcf3
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 51 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ addons:
install:
- ./install_ta-lib.sh
- export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
- pip install --upgrade flake8 coveralls pytest-random-order mypy
- pip install --upgrade flake8 coveralls pytest-random-order pytest-asyncio mypy
- pip install -r requirements.txt
- pip install -e .
jobs:
include:
- script:
- script:
- pytest --cov=freqtrade --cov-config=.coveragerc freqtrade/tests/
- coveralls
- script:
Expand Down
156 changes: 150 additions & 6 deletions freqtrade/exchange/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# pragma pylint: disable=W0603
""" Cryptocurrency Exchanges support """
import logging
import inspect
from random import randint
from typing import List, Dict, Any, Optional
from typing import List, Dict, Tuple, Any, Optional
from datetime import datetime
from math import floor, ceil

import asyncio
import ccxt
import ccxt.async_support as ccxt_async
import arrow

from freqtrade import constants, OperationalException, DependencyException, TemporaryError
Expand All @@ -23,6 +26,24 @@
}


def retrier_async(f):
async def wrapper(*args, **kwargs):
count = kwargs.pop('count', API_RETRY_COUNT)
try:
return await f(*args, **kwargs)
except (TemporaryError, DependencyException) as ex:
logger.warning('%s() returned exception: "%s"', f.__name__, ex)
if count > 0:
count -= 1
kwargs.update({'count': count})
logger.warning('retrying %s() still for %s times', f.__name__, count)
return await wrapper(*args, **kwargs)
else:
logger.warning('Giving up retrying: %s()', f.__name__)
raise ex
return wrapper


def retrier(f):
def wrapper(*args, **kwargs):
count = kwargs.pop('count', API_RETRY_COUNT)
Expand All @@ -45,8 +66,8 @@ class Exchange(object):

# Current selected exchange
_api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {}
_cached_ticker: Dict[str, Any] = {}

# Holds all open sell orders for dry_run
_dry_run_open_orders: Dict[str, Any] = {}
Expand All @@ -60,11 +81,20 @@ def __init__(self, config: dict) -> None:
"""
self._conf.update(config)

self._cached_ticker: Dict[str, Any] = {}

# Holds last candle refreshed time of each pair
self._pairs_last_refresh_time: Dict[str, int] = {}

# Holds candles
self.klines: Dict[str, Any] = {}

if config['dry_run']:
logger.info('Instance is running with dry_run enabled')

exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config)
self._api_async = self._init_ccxt(exchange_config, ccxt_async)

logger.info('Using Exchange "%s"', self.name)

Expand All @@ -75,23 +105,31 @@ def __init__(self, config: dict) -> None:
# Check if timeframe is available
self.validate_timeframes(config['ticker_interval'])

def _init_ccxt(self, exchange_config: dict) -> ccxt.Exchange:
def __del__(self):
"""
Destructor - clean up async stuff
"""
logger.debug("Exchange object destroyed, closing async loop")
if self._api_async and inspect.iscoroutinefunction(self._api_async.close):
asyncio.get_event_loop().run_until_complete(self._api_async.close())

def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt) -> ccxt.Exchange:
"""
Initialize ccxt with given config and return valid
ccxt instance.
"""
# Find matching class for the given exchange name
name = exchange_config['name']

if name not in ccxt.exchanges:
if name not in ccxt_module.exchanges:
raise OperationalException(f'Exchange {name} is not supported')
try:
api = getattr(ccxt, name.lower())({
api = getattr(ccxt_module, name.lower())({
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True)
})
except (KeyError, AttributeError):
raise OperationalException(f'Exchange {name} is not supported')
Expand Down Expand Up @@ -120,6 +158,15 @@ def set_sandbox(self, api, exchange_config: dict, name: str):
"Please check your config.json")
raise OperationalException(f'Exchange {name} does not provide a sandbox api')

def _load_async_markets(self) -> None:
try:
if self._api_async:
asyncio.get_event_loop().run_until_complete(self._api_async.load_markets())

except ccxt.BaseError as e:
logger.warning('Could not load async markets. Reason: %s', e)
return

def validate_pairs(self, pairs: List[str]) -> None:
"""
Checks if all given pairs are tradable on the current exchange.
Expand All @@ -130,6 +177,7 @@ def validate_pairs(self, pairs: List[str]) -> None:

try:
markets = self._api.load_markets()
self._load_async_markets()
except ccxt.BaseError as e:
logger.warning('Unable to validate pairs (assuming they are correct). Reason: %s', e)
return
Expand Down Expand Up @@ -329,6 +377,102 @@ def get_ticker(self, pair: str, refresh: Optional[bool] = True) -> dict:
logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair]

def get_history(self, pair: str, tick_interval: str,
since_ms: int) -> List:
"""
Gets candle history using asyncio and returns the list of candles.
Handles all async doing.
"""
return asyncio.get_event_loop().run_until_complete(
self._async_get_history(pair=pair, tick_interval=tick_interval,
since_ms=since_ms))

async def _async_get_history(self, pair: str,
tick_interval: str,
since_ms: int) -> List:
# Assume exchange returns 500 candles
_LIMIT = 500

one_call = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 * _LIMIT * 1000
logger.debug("one_call: %s", one_call)
input_coroutines = [self._async_get_candle_history(
pair, tick_interval, since) for since in
range(since_ms, arrow.utcnow().timestamp * 1000, one_call)]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)

# Combine tickers
data: List = []
for tick in tickers:
if tick[0] == pair:
data.extend(tick[1])
# Sort data again after extending the result - above calls return in "async order" order
data = sorted(data, key=lambda x: x[0])
logger.info("downloaded %s with length %s.", pair, len(data))
return data

def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None:
"""
Refresh tickers asyncronously and return the result.
"""
logger.debug("Refreshing klines for %d pairs", len(pair_list))
asyncio.get_event_loop().run_until_complete(
self.async_get_candles_history(pair_list, ticker_interval))

async def async_get_candles_history(self, pairs: List[str],
tick_interval: str) -> List[Tuple[str, List]]:
"""Download ohlcv history for pair-list asyncronously """
input_coroutines = [self._async_get_candle_history(
symbol, tick_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
return tickers

@retrier_async
async def _async_get_candle_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> Tuple[str, List]:
try:
# fetch ohlcv asynchronously
logger.debug("fetching %s since %s ...", pair, since_ms)

# Calculating ticker interval in second
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60

# If (last update time) + (interval in second) is greater or equal than now
# that means we don't have to hit the API as there is no new candle
# so we fetch it from local cache
if (not since_ms and
self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp):
data = self.klines[pair]
logger.debug("Using cached klines data for %s ...", pair)
else:
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)

# Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)
# when GDAX returns a list of tickers DESC (newest first, oldest last)
data = sorted(data, key=lambda x: x[0])

# keeping last candle time as last refreshed time of the pair
if data:
self._pairs_last_refresh_time[pair] = data[-1][0] // 1000

# keeping candles in cache
self.klines[pair] = data

logger.debug("done fetching %s ...", pair)
return pair, data

except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical candlestick data.'
f'Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not load ticker history due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}')

@retrier
def get_candle_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]:
Expand Down
16 changes: 10 additions & 6 deletions freqtrade/freqtradebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import arrow
import requests

from cachetools import TTLCache, cached

from freqtrade import (DependencyException, OperationalException,
Expand Down Expand Up @@ -181,6 +182,9 @@ def _process(self, nb_assets: Optional[int] = 0) -> bool:
final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list
self.config['exchange']['pair_whitelist'] = final_list

# Refreshing candles
self.exchange.refresh_tickers(final_list, self.strategy.ticker_interval)

# Query trades from persistence layer
trades = Trade.query.filter(Trade.is_open.is_(True)).all()

Expand Down Expand Up @@ -358,7 +362,7 @@ def _get_min_pair_stake_amount(self, pair: str, price: float) -> Optional[float]
amount_reserve_percent += self.strategy.stoploss
# it should not be more than 50%
amount_reserve_percent = max(amount_reserve_percent, 0.5)
return min(min_stake_amounts)/amount_reserve_percent
return min(min_stake_amounts) / amount_reserve_percent

def create_trade(self) -> bool:
"""
Expand Down Expand Up @@ -387,11 +391,10 @@ def create_trade(self) -> bool:
if not whitelist:
raise DependencyException('No currency pairs in whitelist')

# Pick pair based on buy signals
# running get_signal on historical data fetched
# to find buy signals
for _pair in whitelist:
thistory = self.exchange.get_candle_history(_pair, interval)
(buy, sell) = self.strategy.get_signal(_pair, interval, thistory)

(buy, sell) = self.strategy.get_signal(_pair, interval, self.exchange.klines.get(_pair))
if buy and not sell:
bidstrat_check_depth_of_market = self.config.get('bid_strategy', {}).\
get('check_depth_of_market', {})
Expand All @@ -402,6 +405,7 @@ def create_trade(self) -> bool:
else:
return False
return self.execute_buy(_pair, stake_amount)

return False

def _check_depth_of_market_buy(self, pair: str, conf: Dict) -> bool:
Expand Down Expand Up @@ -581,7 +585,7 @@ def handle_trade(self, trade: Trade) -> bool:
(buy, sell) = (False, False)
experimental = self.config.get('experimental', {})
if experimental.get('use_sell_signal') or experimental.get('ignore_roi_if_buy_signal'):
ticker = self.exchange.get_candle_history(trade.pair, self.strategy.ticker_interval)
ticker = self.exchange.klines.get(trade.pair)
(buy, sell) = self.strategy.get_signal(trade.pair, self.strategy.ticker_interval,
ticker)

Expand Down
14 changes: 8 additions & 6 deletions freqtrade/optimize/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,18 @@ def download_backtesting_testdata(datadir: str,
timerange: Optional[TimeRange] = None) -> None:

"""
Download the latest ticker intervals from the exchange for the pairs passed in parameters
Download the latest ticker intervals from the exchange for the pair passed in parameters
The data is downloaded starting from the last correct ticker interval data that
esists in a cache. If timerange starts earlier than the data in the cache,
exists in a cache. If timerange starts earlier than the data in the cache,
the full data will be redownloaded
Based on @Rybolov work: https://github.com/rybolov/freqtrade-data
:param pairs: list of pairs to download
:param pair: pair to download
:param tick_interval: ticker interval
:param timerange: range of time to download
:return: None
"""

path = make_testdata_path(datadir)
filepair = pair.replace("/", "_")
filename = os.path.join(path, f'{filepair}-{tick_interval}.json')
Expand All @@ -233,8 +232,11 @@ def download_backtesting_testdata(datadir: str,
logger.debug("Current Start: %s", misc.format_ms_time(data[1][0]) if data else 'None')
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')

new_data = exchange.get_candle_history(pair=pair, tick_interval=tick_interval,
since_ms=since_ms)
# Default since_ms to 30 days if nothing is given
new_data = exchange.get_history(pair=pair, tick_interval=tick_interval,
since_ms=since_ms if since_ms
else
int(arrow.utcnow().shift(days=-30).float_timestamp) * 1000)
data.extend(new_data)

logger.debug("New Start: %s", misc.format_ms_time(data[0][0]))
Expand Down
6 changes: 3 additions & 3 deletions freqtrade/optimize/backtesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,15 @@ def start(self) -> None:
Run a backtesting end-to-end
:return: None
"""
data = {}
data: Dict[str, Any] = {}
pairs = self.config['exchange']['pair_whitelist']
logger.info('Using stake_currency: %s ...', self.config['stake_currency'])
logger.info('Using stake_amount: %s ...', self.config['stake_amount'])

if self.config.get('live'):
logger.info('Downloading data for all pairs in whitelist ...')
for pair in pairs:
data[pair] = self.exchange.get_candle_history(pair, self.ticker_interval)
self.exchange.refresh_tickers(pairs, self.ticker_interval)
data = self.exchange.klines
else:
logger.info('Using local backtesting data (using whitelist in given config) ...')

Expand Down
5 changes: 3 additions & 2 deletions freqtrade/strategy/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from typing import Dict, List, NamedTuple, Tuple
from typing import Dict, List, NamedTuple, Optional, Tuple
import warnings

import arrow
Expand Down Expand Up @@ -145,7 +145,8 @@ def analyze_ticker(self, ticker_history: List[Dict], metadata: dict) -> DataFram

return dataframe

def get_signal(self, pair: str, interval: str, ticker_hist: List[Dict]) -> Tuple[bool, bool]:
def get_signal(self, pair: str, interval: str,
ticker_hist: Optional[List[Dict]]) -> Tuple[bool, bool]:
"""
Calculates current signal based several technical analysis indicators
:param pair: pair in format ANT/BTC
Expand Down
Loading

0 comments on commit 179bcf3

Please sign in to comment.