Skip to content

Commit

Permalink
Simulation speed improvements (abides-sim#14)
Browse files Browse the repository at this point in the history
* Added tag attribute to Order, LimitOrder, TradingAgent.placeLimitOrder, TradingAgent.placeMarketOrder. (abides-sim#6)

* bug fixes and enhancements (abides-sim#7)

* modifications to the subscription API to record the exchange timestamp in the published market data message

* market order changes to account for latency and non-zero jitter

* f string removal from all execution agents

* Mm bugfix and Orderbook efficiency improvements (abides-sim#13)

* modifications to the subscription API to record the exchange timestamp in the published market data message

* market order changes to account for latency and non-zero jitter

* f string removal from all execution agents

* Custom serialization for order logging and deep copying. (abides-sim#8)

* Dd serialize orders (abides-sim#11)

* Custom serialization for order logging and deep copying.

* Orderbook updates (abides-sim#10)

* Commit makes the following 3 changes to orderbook:

  1. Speeds up transacted volume computation
  2. Speeds up conversion of log to DataFrame format
  3. Adds `wide-book` compatibility for `book-freq` != 0

* PR10 fix -- removed redundant print statement

* Order._order_ids uses a set instead of a list to manage order IDs

* Dd orderbook updates (abides-sim#12)

* Commit makes the following 3 changes to orderbook:

  1. Speeds up transacted volume computation
  2. Speeds up conversion of log to DataFrame format
  3. Adds `wide-book` compatibility for `book-freq` != 0

* Custom serialization for order logging and deep copying.

* PR10 fix -- removed redundant print statement

* Order._order_ids uses a set instead of a list to manage order IDs

Co-authored-by: Danial Dervovic <[email protected]>
Co-authored-by: Mahmoud Mahfouz <[email protected]>
Co-authored-by: David Byrd <[email protected]>
  • Loading branch information
4 people committed Jun 16, 2020
1 parent 8b4b76c commit bb49f41
Showing 13 changed files with 377 additions and 295 deletions.
53 changes: 27 additions & 26 deletions agent/ExchangeAgent.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

import jsons as js
import pandas as pd
pd.set_option('display.max_rows', 500)

@@ -62,7 +61,6 @@ def __init__(self, id, name, type, mkt_open, mkt_close, symbols, book_freq='S',

# Store orderbook in wide format? ONLY WORKS with book_freq == 0
self.wide_book = wide_book
self.wide_book_warning()

# The subscription dict is a dictionary with the key = agent ID,
# value = dict (key = symbol, value = list [levels (no of levels to recieve updates for),
@@ -128,7 +126,7 @@ def receiveMessage(self, currentTime, msg):
if currentTime > self.mkt_close:
# Most messages after close will receive a 'MKT_CLOSED' message in response. A few things
# might still be processed, like requests for final trade prices or such.
if msg.body['msg'] in ['LIMIT_ORDER', 'CANCEL_ORDER', 'MODIFY_ORDER']:
if msg.body['msg'] in ['LIMIT_ORDER', 'MARKET_ORDER', 'CANCEL_ORDER', 'MODIFY_ORDER']:
log_print("{} received {}: {}", self.name, msg.body['msg'], msg.body['order'])
self.sendMessage(msg.body['sender'], Message({"msg": "MKT_CLOSED"}))

@@ -146,8 +144,8 @@ def receiveMessage(self, currentTime, msg):
return

# Log order messages only if that option is configured. Log all other messages.
if msg.body['msg'] in ['LIMIT_ORDER', 'CANCEL_ORDER']:
if self.log_orders: self.logEvent(msg.body['msg'], js.dump(msg.body['order'], strip_privates=True))
if msg.body['msg'] in ['LIMIT_ORDER', 'MARKET_ORDER', 'CANCEL_ORDER', 'MODIFY_ORDER']:
if self.log_orders: self.logEvent(msg.body['msg'], msg.body['order'].to_dict())
else:
self.logEvent(msg.body['msg'], msg.body['sender'])

@@ -241,11 +239,20 @@ def receiveMessage(self, currentTime, msg):
order = msg.body['order']
log_print("{} received LIMIT_ORDER: {}", self.name, order)
if order.symbol not in self.order_books:
log_print("Order discarded. Unknown symbol: {}", order.symbol)
log_print("Limit Order discarded. Unknown symbol: {}", order.symbol)
else:
# Hand the order to the order book for processing.
self.order_books[order.symbol].handleLimitOrder(deepcopy(order))
self.publishOrderBookData()
elif msg.body['msg'] == "MARKET_ORDER":
order = msg.body['order']
log_print("{} received MARKET_ORDER: {}", self.name, order)
if order.symbol not in self.order_books:
log_print("Market Order discarded. Unknown symbol: {}", order.symbol)
else:
# Hand the market order to the order book for processing.
self.order_books[order.symbol].handleMarketOrder(deepcopy(order))
self.publishOrderBookData()
elif msg.body['msg'] == "CANCEL_ORDER":
# Note: this is somewhat open to abuse, as in theory agents could cancel other agents' orders.
# An agent could also become confused if they receive a (partial) execution on an order they
@@ -305,7 +312,8 @@ def publishOrderBookData(self):
"symbol": symbol,
"bids": self.order_books[symbol].getInsideBids(levels),
"asks": self.order_books[symbol].getInsideAsks(levels),
"last_transaction": self.order_books[symbol].last_trade}))
"last_transaction": self.order_books[symbol].last_trade,
"exchange_ts": self.currentTime}))
self.subscription_dict[agent_id][symbol][2] = orderbook_last_update

def logOrderBookSnapshots(self, symbol):
@@ -330,15 +338,12 @@ def get_quote_range_iterator(s):
if book.book_log:

print("Logging order book to file...")
dfLog = pd.DataFrame(book.book_log)
dfLog = book.book_log_to_df()
dfLog.set_index('QuoteTime', inplace=True)
dfLog = dfLog[~dfLog.index.duplicated(keep='last')]
dfLog.sort_index(inplace=True)

if str(self.book_freq).isdigit() and int(self.book_freq) == 0: # Save all possible information
# With all order snapshots saved DataFrame is very sparse
dfLog = pd.SparseDataFrame(dfLog)

# Get the full range of quotes at the finest possible resolution.
quotes = get_quote_range_iterator(dfLog.columns.unique())

@@ -361,16 +366,18 @@ def get_quote_range_iterator(s):
time_idx = pd.date_range(self.mkt_open, self.mkt_close, freq=self.book_freq, closed='right')
dfLog = dfLog.reindex(time_idx, method='ffill')
dfLog.sort_index(inplace=True)
dfLog = dfLog.stack()
dfLog.sort_index(inplace=True)

# Get the full range of quotes at the finest possible resolution.
quotes = get_quote_range_iterator(dfLog.index.get_level_values(1).unique())
if not self.wide_book:
dfLog = dfLog.stack()
dfLog.sort_index(inplace=True)

# Restructure the log to have multi-level rows of all possible pairs of time and quote
# with volume as the only column.
filledIndex = pd.MultiIndex.from_product([time_idx, quotes], names=['time', 'quote'])
dfLog = dfLog.reindex(filledIndex)
# Get the full range of quotes at the finest possible resolution.
quotes = get_quote_range_iterator(dfLog.index.get_level_values(1).unique())

# Restructure the log to have multi-level rows of all possible pairs of time and quote
# with volume as the only column.
filledIndex = pd.MultiIndex.from_product([time_idx, quotes], names=['time', 'quote'])
dfLog = dfLog.reindex(filledIndex)

filename = f'ORDERBOOK_{symbol}_FREQ_{self.book_freq}'

@@ -398,7 +405,7 @@ def sendMessage (self, recipientID, msg):
# Messages that require order book modification (not simple queries) incur the additional
# parallel processing delay as configured.
super().sendMessage(recipientID, msg, delay = self.pipeline_delay)
if self.log_orders: self.logEvent(msg.body['msg'], js.dump(msg.body['order'], strip_privates=True))
if self.log_orders: self.logEvent(msg.body['msg'], msg.body['order'].to_dict())
else:
# Other message types incur only the currently-configured computation delay for this agent.
super().sendMessage(recipientID, msg)
@@ -409,9 +416,3 @@ def getMarketOpen(self):

def getMarketClose(self):
return self.__mkt_close

def wide_book_warning(self):
""" Prints warning message about wide orderbook format usage. """
if self.wide_book and (self.book_freq != 0):
log_print(f"WARNING: (wide_book == True) and (book_freq != 0). Orderbook will be logged in column MultiIndex "
"format at frequency {self.book_freq}.")
83 changes: 38 additions & 45 deletions agent/TradingAgent.py
Original file line number Diff line number Diff line change
@@ -2,10 +2,10 @@
from agent.ExchangeAgent import ExchangeAgent
from message.Message import Message
from util.order.LimitOrder import LimitOrder
from util.order.MarketOrder import MarketOrder
from util.util import log_print

from copy import deepcopy
import jsons as js
import sys

# The TradingAgent class (via FinancialAgent, via Agent) is intended as the
@@ -52,6 +52,9 @@ def __init__(self, id, name, type, random_state=None, starting_cash=100000, log_
# that can be used to make it happen.
self.last_trade = {}

# used in subscription mode to record the timestamp for which the data was current in the ExchangeAgent
self.exchange_ts = {}

# When a last trade price comes in after market close, the trading agent
# automatically records it as the daily close price for a symbol.
self.daily_close_price = {}
@@ -240,7 +243,6 @@ def receiveMessage (self, currentTime, msg):
self.query_transacted_volume(msg.body['symbol'], msg.body['transacted_volume'])

elif msg.body['msg'] == 'MARKET_DATA':
# Call the queryMarketData method, which subclasses may extend.
self.handleMarketData(msg)

# Now do we know the market hours?
@@ -285,8 +287,8 @@ def get_transacted_volume(self, symbol, lookback_period='10min'):
# string (valid symbol), int (positive share quantity), bool (True == BUY), int (price in cents).
# The call may optionally specify an order_id (otherwise global autoincrement is used) and
# whether cash or risk limits should be enforced or ignored for the order.
def placeLimitOrder (self, symbol, quantity, is_buy_order, limit_price, order_id=None, ignore_risk = True):
order = LimitOrder(self.id, self.currentTime, symbol, quantity, is_buy_order, limit_price, order_id)
def placeLimitOrder (self, symbol, quantity, is_buy_order, limit_price, order_id=None, ignore_risk = True, tag = None):
order = LimitOrder(self.id, self.currentTime, symbol, quantity, is_buy_order, limit_price, order_id, tag)

if quantity > 0:
# Test if this order can be permitted given our at-risk limits.
@@ -317,55 +319,44 @@ def placeLimitOrder (self, symbol, quantity, is_buy_order, limit_price, order_id
"order" : order }))

# Log this activity.
if self.log_orders: self.logEvent('ORDER_SUBMITTED', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('ORDER_SUBMITTED', order.to_dict())

else:
log_print ("TradingAgent ignored limit order of quantity zero: {}", order)

def placeMarketOrder(self, symbol, direction, quantity, ignore_risk=True):
def placeMarketOrder(self, symbol, quantity, is_buy_order, order_id=None, ignore_risk = True, tag=None):
"""
Used by any Trading Agent subclass to place a market order. The market order is created as multiple limit orders
crossing the spread walking the book until all the quantities are matched.
:param symbol (str): name of the stock traded
:param direction (str): order direction ('BUY' or 'SELL')
:param quantity (int): order quantity
:param log_orders (bool): determines whether cash or risk limits should be enforced or ignored for the order
:return:
Used by any Trading Agent subclass to place a market order. The market order is created as multiple limit orders
crossing the spread walking the book until all the quantities are matched.
:param symbol (str): name of the stock traded
:param quantity (int): order quantity
:param is_buy_order (bool): True if Buy else False
:param order_id: Order ID for market replay
:param ignore_risk (bool): Determines whether cash or risk limits should be enforced or ignored for the order
:return:
"""
order = MarketOrder(self.id, self.currentTime, symbol, quantity, is_buy_order, order_id)
if quantity > 0:
# compute new holdings
new_holdings = self.holdings.copy()
q = quantity if direction == 'BUY' else -quantity
if symbol in new_holdings:
new_holdings[symbol] += q
else:
new_holdings[symbol] = q
q = order.quantity if order.is_buy_order else -order.quantity
if order.symbol in new_holdings: new_holdings[order.symbol] += q
else: new_holdings[order.symbol] = q

if not ignore_risk:
# Compute before and after at-risk capital.
at_risk = self.markToMarket(self.holdings) - self.holdings['CASH']
new_at_risk = self.markToMarket(new_holdings) - new_holdings['CASH']

if (new_at_risk > at_risk) and (new_at_risk > self.starting_cash):
log_print(f"TradingAgent ignored market order due to at-risk constraints: {symbol} {direction} {quantity}\n"
f"{self.fmtHoldings(self.holdings)}")
log_print("TradingAgent ignored market order due to at-risk constraints: {}\n{}",
order, self.fmtHoldings(self.holdings))
return

bids, asks = self.getKnownBidAsk(symbol, best=False)
ob_side = asks if direction == 'BUY' else bids
quotes = {}
if quantity > 0:
for price_level in ob_side:
level_price, level_size = price_level[0], price_level[1]
if quantity <= level_size:
quotes[level_price] = quantity
break
else:
quotes[level_price] = level_size
quantity -= level_size
continue
log_print(f'[---- {self.name} - {self.currentTime} ----]: PLACING 1 MARKET ORDER AS MULTIPLE LIMIT ORDERS')
for quote in quotes.items():
p, q = quote[0], quote[1]
self.placeLimitOrder(symbol, quantity=q, is_buy_order=direction=='BUY', limit_price=p)
log_print(f'[---- {self.name} - {self.currentTime} ----]: LIMIT ORDER PLACED - {q} @ {p}')
self.orders[order.order_id] = deepcopy(order)
self.sendMessage(self.exchangeID, Message({"msg" : "MARKET_ORDER", "sender": self.id, "order": order}))
if self.log_orders: self.logEvent('ORDER_SUBMITTED', order.to_dict())
else:
log_print(f"TradingAgent ignored market order of quantity zero: {symbol} {direction} {quantity}")
log_print("TradingAgent ignored market order of quantity zero: {}", order)

def cancelOrder (self, order):
"""Used by any Trading Agent subclass to cancel any order. The order must currently
@@ -374,7 +365,7 @@ def cancelOrder (self, order):
"order" : order }))

# Log this activity.
if self.log_orders: self.logEvent('CANCEL_SUBMITTED', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('CANCEL_SUBMITTED', order.to_dict())

def modifyOrder (self, order, newOrder):
""" Used by any Trading Agent subclass to modify any existing limit order. The order must currently
@@ -384,7 +375,7 @@ def modifyOrder (self, order, newOrder):
"order" : order, "new_order" : newOrder}))

# Log this activity.
if self.log_orders: self.logEvent('MODIFY_ORDER', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('MODIFY_ORDER', order.to_dict())


# Handles ORDER_EXECUTED messages from an exchange agent. Subclasses may wish to extend,
@@ -393,7 +384,7 @@ def orderExecuted (self, order):
log_print ("Received notification of execution for: {}", order)

# Log this activity.
if self.log_orders: self.logEvent('ORDER_EXECUTED', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('ORDER_EXECUTED', order.to_dict())

# At the very least, we must update CASH and holdings at execution time.
qty = order.quantity if order.is_buy_order else -1 * order.quantity
@@ -433,7 +424,8 @@ def orderAccepted (self, order):
log_print ("Received notification of acceptance for: {}", order)

# Log this activity.
if self.log_orders: self.logEvent('ORDER_ACCEPTED', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('ORDER_ACCEPTED', order.to_dict())


# We may later wish to add a status to the open orders so an agent can tell whether
# a given order has been accepted or not (instead of needing to override this method).
@@ -444,7 +436,7 @@ def orderCancelled (self, order):
log_print ("Received notification of cancellation for: {}", order)

# Log this activity.
if self.log_orders: self.logEvent('ORDER_CANCELLED', js.dump(order, strip_privates=True))
if self.log_orders: self.logEvent('ORDER_CANCELLED', order.to_dict())

# Remove the cancelled order from the open orders list. We may of course wish to have
# additional logic here later, so agents can easily "look for" cancelled orders. Of
@@ -509,6 +501,7 @@ def handleMarketData(self, msg):
self.known_asks[symbol] = msg.body['asks']
self.known_bids[symbol] = msg.body['bids']
self.last_trade[symbol] = msg.body['last_transaction']
self.exchange_ts[symbol] = msg.body['exchange_ts']


# Handles QUERY_ORDER_STREAM messages from an exchange agent.
17 changes: 9 additions & 8 deletions agent/examples/SubscriptionAgent.py
Original file line number Diff line number Diff line change
@@ -34,14 +34,15 @@ def receiveMessage(self, currentTime, msg):
super().receiveMessage(currentTime, msg)
if self.subscribe and self.state == 'AWAITING_MARKET_DATA' and msg.body['msg'] == 'MARKET_DATA':
bids, asks = msg.body['bids'], msg.body['asks']
log_print("--------------------")
log_print("current time: {}".format(currentTime))
log_print("seconds elapsed since last update: {}".format((currentTime - self.last_update_ts).delta / 1e9))
log_print("number of bid levels: {}".format(len(bids)))
log_print("number of ask levels: {}".format(len(asks)))
log_print("bids: {}, asks: {}".format(bids, asks))
log_print("--------------------")
print("--------------------")
print("seconds elapsed since last update: {}".format((currentTime - self.last_update_ts).delta / 1e9))
print("number of bid levels: {}".format(len(bids)))
print("number of ask levels: {}".format(len(asks)))
print("bids: {}, asks: {}".format(bids, asks))
print("Current Agent Timestamp: {}".format(currentTime))
print("Exchange Timestamp: {}".format(self.exchange_ts[self.symbol]))
print("--------------------")
self.last_update_ts = currentTime

def getWakeFrequency(self):
return pd.Timedelta('1s')
return pd.Timedelta('1s')
39 changes: 0 additions & 39 deletions agent/execution/AggressiveAgent.py

This file was deleted.

Loading

0 comments on commit bb49f41

Please sign in to comment.