Skip to content

Commit

Permalink
BLD: added graceful finish when given an end_date in live
Browse files Browse the repository at this point in the history
  • Loading branch information
AvishaiW committed Apr 10, 2018
1 parent 6782c4f commit 77a07fd
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 41 deletions.
78 changes: 49 additions & 29 deletions catalyst/exchange/exchange_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,14 @@ def run(self, data=None, overwrite_sim_params=True):


class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):

def __init__(self, *args, **kwargs):
self.algo_namespace = kwargs.pop('algo_namespace', None)
self.live_graph = kwargs.pop('live_graph', None)
self.stats_output = kwargs.pop('stats_output', None)
self._analyze_live = kwargs.pop('analyze_live', None)
self.end = kwargs.pop('end', None)
self.is_end = kwargs.pop('is_end', True)

self._clock = None
self.frame_stats = list()
Expand Down Expand Up @@ -428,6 +430,37 @@ def __init__(self, *args, **kwargs):
log.warn("Can't initialize signal handler inside another thread."
"Exit should be handled by the user.")

def get_frame_stats(self):
"""
preparing the stats before analyze
:return: stats: pd.Dataframe
"""
# add the last day stats which is not saved in the directory
current_stats = pd.DataFrame(self.frame_stats)
current_stats.set_index('period_close', drop=False, inplace=True)

# get the location of the directory
algo_folder = get_algo_folder(self.algo_namespace)
folder = join(algo_folder, 'frame_stats')

if exists(folder):
files = [f for f in listdir(folder) if isfile(join(folder, f))]

period_stats_list = []
for item in files:
filename = join(folder, item)

with open(filename, 'rb') as handle:
perf_period = pickle.load(handle)
period_stats_list.extend(perf_period)

stats = pd.DataFrame(period_stats_list)
stats.set_index('period_close', drop=False, inplace=True)

return pd.concat([stats, current_stats])
else:
return current_stats

def interrupt_algorithm(self):
"""
Expand All @@ -451,31 +484,7 @@ def interrupt_algorithm(self):
log.info('Exiting the algorithm. Calling `analyze()` '
'before exiting the algorithm.')

# add the last day stats which is not saved in the directory
current_stats = pd.DataFrame(self.frame_stats)
current_stats.set_index('period_close', drop=False, inplace=True)

# get the location of the directory
algo_folder = get_algo_folder(self.algo_namespace)
folder = join(algo_folder, 'frame_stats')

if exists(folder):
files = [f for f in listdir(folder) if isfile(join(folder, f))]

period_stats_list = []
for item in files:
filename = join(folder, item)

with open(filename, 'rb') as handle:
perf_period = pickle.load(handle)
period_stats_list.extend(perf_period)

stats = pd.DataFrame(period_stats_list)
stats.set_index('period_close', drop=False, inplace=True)

stats = pd.concat([stats, current_stats])
else:
stats = current_stats
stats = self.get_frame_stats()

self.analyze(stats)

Expand Down Expand Up @@ -527,10 +536,12 @@ def _create_clock(self):
self.sim_params.sessions,
context=self,
callback=self._analyze_live,
end=self.end if self.is_end else None
)
else:
self._clock = SimpleClock(
self.sim_params.sessions,
end=self.end if self.is_end else None
)

return self._clock
Expand Down Expand Up @@ -820,10 +831,6 @@ def handle_data(self, data):
if not self.is_running:
return

if self.end is not None and self.end < data.current_dt:
log.info('Algorithm has reached specified end time. Finishing...')
self.interrupt_algorithm()

# Resetting the frame stats every day to minimize memory footprint
today = data.current_dt.floor('1D')
if self.current_day is not None and today > self.current_day:
Expand Down Expand Up @@ -999,6 +1006,19 @@ def _get_open_orders(self, asset=None):

return open_orders

def analyze(self, perf):
super(ExchangeTradingAlgorithmLive, self)\
.analyze(self.get_frame_stats())

def run(self, data=None, overwrite_sim_params=True):
data.attempts = self.attempts
perf = super(ExchangeTradingAlgorithmLive, self).run(
data, overwrite_sim_params
)
# Rebuilding the stats to support minute data
stats = self.get_frame_stats()
return stats

@error_keywords(sid='Keyword argument `sid` is no longer supported for '
'get_open_orders. Use `asset` instead.')
@api_method
Expand Down
10 changes: 8 additions & 2 deletions catalyst/exchange/live_graph_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from catalyst.exchange.utils.stats_utils import prepare_stats
from catalyst.gens.sim_engine import (
BAR,
SESSION_START
SESSION_START,
SESSION_END,
)
from logbook import Logger

Expand Down Expand Up @@ -37,14 +38,15 @@ class LiveGraphClock(object):
"""

def __init__(self, sessions, context, callback=None,
time_skew=pd.Timedelta('0s')):
time_skew=pd.Timedelta('0s'), end=None):

self.sessions = sessions
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = True
self.context = context
self.callback = callback
self.end = end

def __iter__(self):
from matplotlib import pyplot as plt
Expand All @@ -54,6 +56,8 @@ def __iter__(self):
current_time = pd.Timestamp.utcnow()
current_minute = current_time.floor('1T')

if self.end is not None and current_minute >= self.end:
break
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))

Expand All @@ -72,3 +76,5 @@ def __iter__(self):

# Workaround: https://stackoverflow.com/a/33050617/814633
plt.pause(1)

yield current_minute, SESSION_END
10 changes: 8 additions & 2 deletions catalyst/exchange/simple_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from catalyst.constants import LOG_LEVEL
from catalyst.gens.sim_engine import (
BAR,
SESSION_START
SESSION_START,
SESSION_END,
)
from logbook import Logger

Expand All @@ -37,12 +38,13 @@ class SimpleClock(object):
the Broker and the live trading machine's clock.
"""

def __init__(self, sessions, time_skew=pd.Timedelta("0s")):
def __init__(self, sessions, time_skew=pd.Timedelta("0s"), end=None):

self.sessions = sessions
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = True
self.end = end

def __iter__(self):
yield pd.Timestamp.utcnow(), SESSION_START
Expand All @@ -51,10 +53,14 @@ def __iter__(self):
current_time = pd.Timestamp.utcnow()
current_minute = current_time.floor('1 min')

if self.end is not None and current_minute >= self.end:
break
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))

self._last_emit = current_minute
yield current_minute, BAR
else:
sleep(1)

yield current_minute, SESSION_END
16 changes: 8 additions & 8 deletions catalyst/finance/performance/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,14 @@ def handle_simulation_end(self):
When the simulation is complete, run the full period risk report
and send it out on the results socket.
"""

log_msg = "Simulated {n} trading days out of {m}."
log.info(log_msg.format(n=int(self.session_count),
m=self.total_session_count))
log.info("first open: {d}".format(
d=self.sim_params.first_open))
log.info("last close: {d}".format(
d=self.sim_params.last_close))
if self.sim_params.arena == 'backtest':
log_msg = "Simulated {n} trading days out of {m}."
log.info(log_msg.format(n=int(self.session_count),
m=self.total_session_count))
log.info("first open: {d}".format(
d=self.sim_params.first_open))
log.info("last close: {d}".format(
d=self.sim_params.last_close))

bms = pd.Series(
index=self.cumulative_risk_metrics.cont_index,
Expand Down
6 changes: 6 additions & 0 deletions catalyst/utils/run_algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,13 @@ def choose_loader(column):
start = pd.Timestamp.utcnow()

# TODO: fix the end data.
# is_end checks if an end date was specified by user
# needed for live clock
is_end = True

if end is None:
end = start + timedelta(hours=8760)
is_end = False

data = DataPortalExchangeLive(
exchanges=exchanges,
Expand Down Expand Up @@ -253,6 +258,7 @@ def choose_loader(column):
stats_output=stats_output,
analyze_live=analyze_live,
end=end,
is_end=is_end,
)
elif exchanges:
# Removed the existing Poloniex fork to keep things simple
Expand Down

0 comments on commit 77a07fd

Please sign in to comment.