Skip to content

Commit

Permalink
merging adi's changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fredfortier committed Apr 12, 2018
2 parents 5caa4ac + 613c773 commit 28cd316
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 72 deletions.
28 changes: 14 additions & 14 deletions catalyst/_protocol.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ cdef class BarData:
def current(self, assets, fields):
"""
Returns the current value of the given assets for the given fields
at the current simulation time. Current values are the as-traded price
and are usually not adjusted for events like splits or dividends (see
notes for more information).
at the current simulation time.
Parameters
----------
Expand Down Expand Up @@ -293,20 +291,20 @@ cdef class BarData:
"price" returns the last known close price of the asset. If there is
no last known value (either because the asset has never traded, or
because it has delisted) NaN is returned. If a value is found, and we
had to cross an adjustment boundary (split, dividend, etc) to get it,
the value is adjusted before being returned.
because it has delisted) NaN is returned.
"last_traded" returns the date of the last trade event of the asset,
even if the asset has stopped trading. If there is no last known value,
pd.NaT is returned.
"volume" returns the trade volume for the current simulation time. If
In backtest mode, "volume" returns the trade volume for the current simulation time. If
there is no trade this minute, 0 is returned.
"open", "high", "low", and "close" return the relevant information for
the current trade bar. If there is no current trade bar, NaN is
returned.
In live mode, "volume" returns the last 24 hour trading volume.
"""
multiple_assets = _is_iterable(assets)
multiple_fields = _is_iterable(fields)
Expand Down Expand Up @@ -466,6 +464,7 @@ cdef class BarData:
-------
can_trade : bool or pd.Series[bool] indexed by asset.
"""

dt = self.simulation_dt_func()

if self._adjust_minutes:
Expand Down Expand Up @@ -594,8 +593,12 @@ cdef class BarData:
"""
Returns a window of data for the given assets and fields.
This data is adjusted for splits, dividends, and mergers as of the
current algorithm time.
The label of each candle is left-bound. This means that for example, in case of 5 minute candles,
the candle of 00:00:00 will hold the data between 00:00:00 and 00:04:59.
In minute mode, both in live and backtest, the last candle will almost always
be a partial one. For example, if the current time is 00:02:00 the last 5 minute candle
with 00:00:00 timestamp will hold data between 00:00:00 and 00:01:59.
The semantics of missing data are identical to the ones described in
the notes for `get_spot_value`.
Expand All @@ -609,7 +612,8 @@ cdef class BarData:
bar_count: integer number of bars of trade data
frequency: string. "1m" for minutely data or "1d" for daily date
frequency: string. "T" for minutely data, "D" for daily date,
"H" for hourly data
Returns
-------
Expand All @@ -630,10 +634,6 @@ cdef class BarData:
Panel is indexed by field, has dt as the major axis, and assets
as the minor axis.
Notes
-----
If the current simulation time is not a valid market time, we use the
last market close instead.
"""
if isinstance(fields, string_types):
single_asset = isinstance(assets, PricingDataAssociable)
Expand Down
78 changes: 49 additions & 29 deletions catalyst/exchange/exchange_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,14 @@ def run(self, data=None, overwrite_sim_params=True):


class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):

def __init__(self, *args, **kwargs):
self.algo_namespace = kwargs.pop('algo_namespace', None)
self.live_graph = kwargs.pop('live_graph', None)
self.stats_output = kwargs.pop('stats_output', None)
self._analyze_live = kwargs.pop('analyze_live', None)
self.end = kwargs.pop('end', None)
self.is_end = kwargs.pop('is_end', True)

self._clock = None
self.frame_stats = list()
Expand Down Expand Up @@ -428,6 +430,37 @@ def __init__(self, *args, **kwargs):
log.warn("Can't initialize signal handler inside another thread."
"Exit should be handled by the user.")

def get_frame_stats(self):
"""
preparing the stats before analyze
:return: stats: pd.Dataframe
"""
# add the last day stats which is not saved in the directory
current_stats = pd.DataFrame(self.frame_stats)
current_stats.set_index('period_close', drop=False, inplace=True)

# get the location of the directory
algo_folder = get_algo_folder(self.algo_namespace)
folder = join(algo_folder, 'frame_stats')

if exists(folder):
files = [f for f in listdir(folder) if isfile(join(folder, f))]

period_stats_list = []
for item in files:
filename = join(folder, item)

with open(filename, 'rb') as handle:
perf_period = pickle.load(handle)
period_stats_list.extend(perf_period)

stats = pd.DataFrame(period_stats_list)
stats.set_index('period_close', drop=False, inplace=True)

return pd.concat([stats, current_stats])
else:
return current_stats

def interrupt_algorithm(self):
"""
Expand All @@ -451,31 +484,7 @@ def interrupt_algorithm(self):
log.info('Exiting the algorithm. Calling `analyze()` '
'before exiting the algorithm.')

# add the last day stats which is not saved in the directory
current_stats = pd.DataFrame(self.frame_stats)
current_stats.set_index('period_close', drop=False, inplace=True)

# get the location of the directory
algo_folder = get_algo_folder(self.algo_namespace)
folder = join(algo_folder, 'frame_stats')

if exists(folder):
files = [f for f in listdir(folder) if isfile(join(folder, f))]

period_stats_list = []
for item in files:
filename = join(folder, item)

with open(filename, 'rb') as handle:
perf_period = pickle.load(handle)
period_stats_list.extend(perf_period)

stats = pd.DataFrame(period_stats_list)
stats.set_index('period_close', drop=False, inplace=True)

stats = pd.concat([stats, current_stats])
else:
stats = current_stats
stats = self.get_frame_stats()

self.analyze(stats)

Expand Down Expand Up @@ -527,10 +536,12 @@ def _create_clock(self):
self.sim_params.sessions,
context=self,
callback=self._analyze_live,
end=self.end if self.is_end else None
)
else:
self._clock = SimpleClock(
self.sim_params.sessions,
end=self.end if self.is_end else None
)

return self._clock
Expand Down Expand Up @@ -820,10 +831,6 @@ def handle_data(self, data):
if not self.is_running:
return

if self.end is not None and self.end < data.current_dt:
log.info('Algorithm has reached specified end time. Finishing...')
self.interrupt_algorithm()

# Resetting the frame stats every day to minimize memory footprint
today = data.current_dt.floor('1D')
if self.current_day is not None and today > self.current_day:
Expand Down Expand Up @@ -999,6 +1006,19 @@ def _get_open_orders(self, asset=None):

return open_orders

def analyze(self, perf):
super(ExchangeTradingAlgorithmLive, self)\
.analyze(self.get_frame_stats())

def run(self, data=None, overwrite_sim_params=True):
data.attempts = self.attempts
perf = super(ExchangeTradingAlgorithmLive, self).run(
data, overwrite_sim_params
)
# Rebuilding the stats to support minute data
stats = self.get_frame_stats()
return stats

@error_keywords(sid='Keyword argument `sid` is no longer supported for '
'get_open_orders. Use `asset` instead.')
@api_method
Expand Down
10 changes: 8 additions & 2 deletions catalyst/exchange/live_graph_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from catalyst.exchange.utils.stats_utils import prepare_stats
from catalyst.gens.sim_engine import (
BAR,
SESSION_START
SESSION_START,
SESSION_END,
)
from logbook import Logger

Expand Down Expand Up @@ -37,14 +38,15 @@ class LiveGraphClock(object):
"""

def __init__(self, sessions, context, callback=None,
time_skew=pd.Timedelta('0s')):
time_skew=pd.Timedelta('0s'), end=None):

self.sessions = sessions
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = True
self.context = context
self.callback = callback
self.end = end

def __iter__(self):
from matplotlib import pyplot as plt
Expand All @@ -54,6 +56,8 @@ def __iter__(self):
current_time = pd.Timestamp.utcnow()
current_minute = current_time.floor('1T')

if self.end is not None and current_minute >= self.end:
break
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))

Expand All @@ -72,3 +76,5 @@ def __iter__(self):

# Workaround: https://stackoverflow.com/a/33050617/814633
plt.pause(1)

yield current_minute, SESSION_END
10 changes: 8 additions & 2 deletions catalyst/exchange/simple_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from catalyst.constants import LOG_LEVEL
from catalyst.gens.sim_engine import (
BAR,
SESSION_START
SESSION_START,
SESSION_END,
)
from logbook import Logger

Expand All @@ -37,12 +38,13 @@ class SimpleClock(object):
the Broker and the live trading machine's clock.
"""

def __init__(self, sessions, time_skew=pd.Timedelta("0s")):
def __init__(self, sessions, time_skew=pd.Timedelta("0s"), end=None):

self.sessions = sessions
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = True
self.end = end

def __iter__(self):
yield pd.Timestamp.utcnow(), SESSION_START
Expand All @@ -51,10 +53,14 @@ def __iter__(self):
current_time = pd.Timestamp.utcnow()
current_minute = current_time.floor('1 min')

if self.end is not None and current_minute >= self.end:
break
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))

self._last_emit = current_minute
yield current_minute, BAR
else:
sleep(1)

yield current_minute, SESSION_END
16 changes: 8 additions & 8 deletions catalyst/finance/performance/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,14 @@ def handle_simulation_end(self):
When the simulation is complete, run the full period risk report
and send it out on the results socket.
"""

log_msg = "Simulated {n} trading days out of {m}."
log.info(log_msg.format(n=int(self.session_count),
m=self.total_session_count))
log.info("first open: {d}".format(
d=self.sim_params.first_open))
log.info("last close: {d}".format(
d=self.sim_params.last_close))
if self.sim_params.arena == 'backtest':
log_msg = "Simulated {n} trading days out of {m}."
log.info(log_msg.format(n=int(self.session_count),
m=self.total_session_count))
log.info("first open: {d}".format(
d=self.sim_params.first_open))
log.info("last close: {d}".format(
d=self.sim_params.last_close))

bms = pd.Series(
index=self.cumulative_risk_metrics.cont_index,
Expand Down
Loading

0 comments on commit 28cd316

Please sign in to comment.