Skip to content

Commit

Permalink
merge develop into track-current-candle
Browse files Browse the repository at this point in the history
  • Loading branch information
robcaulk committed Nov 3, 2022
2 parents db94232 + c2130ed commit 444a068
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 60 deletions.
12 changes: 12 additions & 0 deletions docs/includes/pairlists.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,18 @@ Min price precision for SHITCOIN/BTC is 8 decimals. If its price is 0.00000011 -

Shuffles (randomizes) pairs in the pairlist. It can be used for preventing the bot from trading some of the pairs more frequently then others when you want all pairs be treated with the same priority.

By default, ShuffleFilter will shuffle pairs once per candle.
To shuffle on every iteration, set `"shuffle_frequency"` to `"iteration"` instead of the default of `"candle"`.

``` json
{
"method": "ShuffleFilter",
"shuffle_frequency": "candle",
"seed": 42
}

```

!!! Tip
You may set the `seed` value for this Pairlist to obtain reproducible results, which can be useful for repeated backtesting sessions. If `seed` is not set, the pairs are shuffled in the non-repeatable random order. ShuffleFilter will automatically detect runmodes and apply the `seed` only for backtesting modes - if a `seed` value is set.

Expand Down
24 changes: 17 additions & 7 deletions freqtrade/freqai/data_kitchen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,28 +1316,38 @@ def get_backtesting_prediction(
append_df = pd.read_hdf(self.backtesting_results_path)
return append_df

def check_if_backtest_prediction_exists(
self
def check_if_backtest_prediction_is_valid(
self,
length_backtesting_dataframe: int
) -> bool:
"""
Check if a backtesting prediction already exists
:param dk: FreqaiDataKitchen
Check if a backtesting prediction already exists and if the predictions
to append has the same size of backtesting dataframe slice
:param length_backtesting_dataframe: Length of backtesting dataframe slice
:return:
:boolean: whether the prediction file exists or not.
:boolean: whether the prediction file is valid.
"""
path_to_predictionfile = Path(self.full_path /
self.backtest_predictions_folder /
f"{self.model_filename}_prediction.h5")
self.backtesting_results_path = path_to_predictionfile

file_exists = path_to_predictionfile.is_file()

if file_exists:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
append_df = self.get_backtesting_prediction()
if len(append_df) == length_backtesting_dataframe:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
return True
else:
logger.info("A new backtesting prediction file is required. "
"(Number of predictions is different from dataframe length).")
return False
else:
logger.info(
f"Could not find backtesting prediction file at {path_to_predictionfile}"
)
return file_exists
return False

def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
Expand Down
2 changes: 1 addition & 1 deletion freqtrade/freqai/freqai_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def start_backtesting(

dk.set_new_model_names(pair, trained_timestamp)

if dk.check_if_backtest_prediction_exists():
if dk.check_if_backtest_prediction_is_valid(len(dataframe_backtest)):
self.dd.load_metadata(dk)
dk.find_features(dataframe_train)
self.check_if_feature_list_matches_strategy(dk)
Expand Down
1 change: 0 additions & 1 deletion freqtrade/plugins/pairlist/IPairList.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(self, exchange: Exchange, pairlistmanager,
self._pairlistconfig = pairlistconfig
self._pairlist_pos = pairlist_pos
self.refresh_period = self._pairlistconfig.get('refresh_period', 1800)
self._last_refresh = 0
LoggingMixin.__init__(self, logger, self.refresh_period)

@property
Expand Down
17 changes: 15 additions & 2 deletions freqtrade/plugins/pairlist/ShuffleFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
"""
import logging
import random
from typing import Any, Dict, List
from typing import Any, Dict, List, Literal

from freqtrade.constants import Config
from freqtrade.enums import RunMode
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.exchange.types import Tickers
from freqtrade.plugins.pairlist.IPairList import IPairList
from freqtrade.util.periodic_cache import PeriodicCache


logger = logging.getLogger(__name__)

ShuffleValues = Literal['candle', 'iteration']


class ShuffleFilter(IPairList):

Expand All @@ -31,6 +35,9 @@ def __init__(self, exchange, pairlistmanager,
logger.info(f"Backtesting mode detected, applying seed value: {self._seed}")

self._random = random.Random(self._seed)
self._shuffle_freq: ShuffleValues = pairlistconfig.get('shuffle_frequency', 'candle')
self.__pairlist_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._config['timeframe']))

@property
def needstickers(self) -> bool:
Expand All @@ -45,7 +52,7 @@ def short_desc(self) -> str:
"""
Short whitelist method description - used for startup-messages
"""
return (f"{self.name} - Shuffling pairs" +
return (f"{self.name} - Shuffling pairs every {self._shuffle_freq}" +
(f", seed = {self._seed}." if self._seed is not None else "."))

def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
Expand All @@ -56,7 +63,13 @@ def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
:param tickers: Tickers (from exchange.get_tickers). May be cached.
:return: new whitelist
"""
pairlist_bef = tuple(pairlist)
pairlist_new = self.__pairlist_cache.get(pairlist_bef)
if pairlist_new and self._shuffle_freq == 'candle':
# Use cached pairlist.
return pairlist_new
# Shuffle is done inplace
self._random.shuffle(pairlist)
self.__pairlist_cache[pairlist_bef] = pairlist

return pairlist
10 changes: 2 additions & 8 deletions freqtrade/rpc/api_server/api_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,6 @@ async def message_endpoint(
except Exception as e:
logger.info(f"Consumer connection failed - {channel}: {e}")
logger.debug(e, exc_info=e)
finally:
await channel_manager.on_disconnect(ws)

else:
if channel:
await channel_manager.on_disconnect(ws)
await ws.close()

except RuntimeError:
# WebSocket was closed
Expand All @@ -144,4 +137,5 @@ async def message_endpoint(
# Log tracebacks to keep track of what errors are happening
logger.exception(e)
finally:
await channel_manager.on_disconnect(ws)
if channel:
await channel_manager.on_disconnect(ws)
4 changes: 4 additions & 0 deletions freqtrade/rpc/api_server/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ async def _broadcast_queue_data(self):
# Get data from queue
message: WSMessageSchemaType = await async_queue.get()
logger.debug(f"Found message of type: {message.get('type')}")
async_queue.task_done()
# Broadcast it
await self._ws_channel_manager.broadcast(message)
except asyncio.CancelledError:
Expand All @@ -210,6 +211,9 @@ async def _broadcast_queue_data(self):
# Disconnect channels and stop the loop on cancel
await self._ws_channel_manager.disconnect_all()
self._ws_loop.stop()
# Avoid adding more items to the queue if they aren't
# going to get broadcasted.
self._ws_queue = None

def start_api(self):
"""
Expand Down
37 changes: 27 additions & 10 deletions freqtrade/rpc/api_server/ws/channel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import time
from threading import RLock
from typing import Any, Dict, List, Optional, Type, Union
from uuid import uuid4
Expand Down Expand Up @@ -46,7 +47,7 @@ def __init__(
self._relay_task = asyncio.create_task(self.relay())

# Internal event to signify a closed websocket
self._closed = False
self._closed = asyncio.Event()

# Wrap the WebSocket in the Serializing class
self._wrapped_ws = self._serializer_cls(self._websocket)
Expand All @@ -73,15 +74,26 @@ async def send(self, data) -> bool:
Add the data to the queue to be sent.
:returns: True if data added to queue, False otherwise
"""

# This block only runs if the queue is full, it will wait
# until self.drain_timeout for the relay to drain the outgoing queue
# We can't use asyncio.wait_for here because the queue may have been created with a
# different eventloop
start = time.time()
while self.queue.full():
await asyncio.sleep(1)
if (time.time() - start) > self.drain_timeout:
return False

# If for some reason the queue is still full, just return False
try:
await asyncio.wait_for(
self.queue.put(data),
timeout=self.drain_timeout
)
return True
except asyncio.TimeoutError:
self.queue.put_nowait(data)
except asyncio.QueueFull:
return False

# If we got here everything is ok
return True

async def recv(self):
"""
Receive data on the wrapped websocket
Expand All @@ -99,14 +111,19 @@ async def close(self):
Close the WebSocketChannel
"""

self._closed = True
try:
await self.raw_websocket.close()
except Exception:
pass

self._closed.set()
self._relay_task.cancel()

def is_closed(self) -> bool:
"""
Closed flag
"""
return self._closed
return self._closed.is_set()

def set_subscriptions(self, subscriptions: List[str] = []) -> None:
"""
Expand All @@ -129,7 +146,7 @@ async def relay(self):
Relay messages from the channel's queue and send them out. This is started
as a task.
"""
while True:
while not self._closed.is_set():
message = await self.queue.get()
try:
await self._send(message)
Expand Down
8 changes: 4 additions & 4 deletions freqtrade/rpc/external_message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ async def _receive_messages(
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
pong = await channel.ping()
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)

await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {channel} still alive...")
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")

continue
except (websockets.exceptions.ConnectionClosed):
Expand All @@ -276,7 +276,7 @@ async def _receive_messages(
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s")
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e)
await asyncio.sleep(self.sleep_time)

Expand Down
31 changes: 14 additions & 17 deletions scripts/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import pandas
import rapidjson
import websockets
from dateutil.relativedelta import relativedelta


logger = logging.getLogger("WebSocketClient")
Expand All @@ -28,7 +27,7 @@

def setup_logging(filename: str):
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(filename),
Expand Down Expand Up @@ -75,16 +74,15 @@ def load_config(configfile):

def readable_timedelta(delta):
"""
Convert a dateutil.relativedelta to a readable format
Convert a millisecond delta to a readable format
:param delta: A dateutil.relativedelta
:param delta: A delta between two timestamps in milliseconds
:returns: The readable time difference string
"""
attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds', 'microseconds']
return ", ".join([
'%d %s' % (getattr(delta, attr), attr if getattr(delta, attr) > 0 else attr[:-1])
for attr in attrs if getattr(delta, attr)
])
seconds, milliseconds = divmod(delta, 1000)
minutes, seconds = divmod(seconds, 60)

return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}"

# ----------------------------------------------------------------------------

Expand Down Expand Up @@ -170,8 +168,8 @@ async def on_message(self, websocket, name, message):

def _calculate_time_difference(self):
old_last_received_at = self._LAST_RECEIVED_AT
self._LAST_RECEIVED_AT = time.time() * 1e6
time_delta = relativedelta(microseconds=(self._LAST_RECEIVED_AT - old_last_received_at))
self._LAST_RECEIVED_AT = time.time() * 1e3
time_delta = self._LAST_RECEIVED_AT - old_last_received_at

return readable_timedelta(time_delta)

Expand Down Expand Up @@ -242,12 +240,10 @@ async def create_client(
):
# Try pinging
try:
pong = ws.ping()
await asyncio.wait_for(
pong,
timeout=ping_timeout
)
logger.info("Connection still alive...")
pong = await ws.ping()
latency = (await asyncio.wait_for(pong, timeout=ping_timeout) * 1000)

logger.info(f"Connection still alive, latency: {latency}ms")

continue

Expand All @@ -272,6 +268,7 @@ async def create_client(
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK
):
logger.info("Connection was closed")
# Just keep trying to connect again indefinitely
await asyncio.sleep(sleep_time)

Expand Down
14 changes: 8 additions & 6 deletions tests/freqai/test_freqai_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@ def is_mac() -> bool:
return "Darwin" in machine


@pytest.mark.parametrize('model', [
'LightGBMRegressor',
'XGBoostRegressor',
'XGBoostRFRegressor',
'CatboostRegressor',
@pytest.mark.parametrize('model, pca, dbscan', [
('LightGBMRegressor', True, False),
('XGBoostRegressor', False, True),
('XGBoostRFRegressor', False, False),
('CatboostRegressor', False, False),
])
def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model):
def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model, pca, dbscan):
if is_arm() and model == 'CatboostRegressor':
pytest.skip("CatBoost is not supported on ARM")

model_save_ext = 'joblib'
freqai_conf.update({"freqaimodel": model})
freqai_conf.update({"timerange": "20180110-20180130"})
freqai_conf.update({"strategy": "freqai_test_strat"})
freqai_conf['freqai']['feature_parameters'].update({"principal_component_analysis": pca})
freqai_conf['freqai']['feature_parameters'].update({"use_DBSCAN_to_remove_outliers": dbscan})

strategy = get_patched_freqai_strategy(mocker, freqai_conf)
exchange = get_patched_exchange(mocker, freqai_conf)
Expand Down
Loading

0 comments on commit 444a068

Please sign in to comment.