Skip to content

Commit

Permalink
Merge branch 'finance' into master_finance_merge
Browse files Browse the repository at this point in the history
Conflicts:
	zipline/messaging.py
	zipline/test/client.py
  • Loading branch information
fawce committed Mar 1, 2012
2 parents 2827c20 + 7846a1e commit 6283906
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 350 deletions.
2 changes: 1 addition & 1 deletion etc/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ pyzmq==2.1.11
gevent-zeromq==0.2.2
msgpack-python==0.1.12
humanhash==0.0.1
ujson=1.18
ujson==1.18
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ detailed-errors=1


# Drop into debugger on failure
pdb=0
pdb-failures=0
#pdb=0
#pdb-failures=0

198 changes: 111 additions & 87 deletions zipline/finance/trading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import datetime
import pytz
import math

from zmq.core.poll import select

Expand All @@ -17,46 +19,44 @@ def __init__(self):

@property
def get_id(self):
return "TRADING_CLIENT"
return str(zp.FINANCE_COMPONENT.TRADING_CLIENT)

def open(self):
self.result_feed = self.connect_result()
self.order_socket = self.connect_order()

def do_work(self):
#next feed event
(rlist, wlist, xlist) = select([self.result_feed],
[],
[self.result_feed],
timeout=self.heartbeat_timeout/100) #select timeout is in sec, use 10x
#
#no more orders, should be an error condition
if len(rlist) == 0 or len(xlist) > 0:
raise Exception("unexpected end of feed stream")
message = rlist[0].recv()
if message == str(zp.CONTROL_PROTOCOL.DONE):
self.signal_done()
return #leave open orders hanging? client requests for orders?
socks = dict(self.poll.poll(self.heartbeat_timeout))

if self.result_feed in socks and socks[self.result_feed] == self.zmq.POLLIN:
msg = self.result_feed.recv()

if msg == str(zp.CONTROL_PROTOCOL.DONE):
qutil.LOGGER.info("Client is DONE!")
self.signal_done()
return

event = zp.MERGE_UNFRAME(message)
self._handle_event(event)
event = zp.MERGE_UNFRAME(msg)
self._handle_event(event)

def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])

def _handle_event(self, event):
self.event_queue.append(event)
if event.SIM_DT <= event.dt:
#event occurred in the present, send the queue to be processed
self.handle_events(self.event_queue)
self.order_socket.send(str(zp.CONTROL_PROTOCOL.DONE))
self.handle_event(event)
#signal done to order source.
self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK))

def handle_events(self, event_queue):
def handle_event(self, event):
raise NotImplementedError

def order(self, sid, amount):
self.order_socket.send(zp.ORDER_FRAME(sid, amount))

def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))

class OrderDataSource(qmsg.DataSource):
"""DataSource that relays orders from the client"""

Expand All @@ -71,12 +71,13 @@ def __init__(self, simulation_dt):
'volume' : integer for volume
}
"""
zm.DataSource.__init__(self, "ORDER_SIM")
qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE)
self.simulation_dt = simulation_dt
self.last_iteration_duration = datetime.timedelta(seconds=0)
self.sent_count = 0

def get_type(self):
return 'ORDER_SIM'
return zp.FINANCE_COMPONENT.ORDER_SOURCE

def open(self):
qmsg.DataSource.open(self)
Expand All @@ -85,16 +86,19 @@ def open(self):
def bind_order(self):
return self.bind_pull_socket(self.addresses['order_address'])

def do_work(self):
def do_work(self):
#mark the start time for client's processing of this event.
self.event_start = datetime.datetime.utcnow()
self.result_socket.send(zp.TRANSFORM_FRAME('ORDER_SIM', self.simulation_dt), self.zmq.NOBLOCK)

self.simulation_dt = self.simulation_dt + self.last_iteration_duration

#TODO: if this is the first iteration, break deadlock by sending a dummy order
if(self.sent_count == 0):
self.send_dummy()

#pull all orders from client.
orders = []
order_dt = None
count = 0
while True:
(rlist, wlist, xlist) = select([self.order_socket],
[],
Expand All @@ -106,7 +110,11 @@ def do_work(self):
continue

order_msg = rlist[0].recv()
if order_msg == str(zp.CONTROL_PROTOCOL.DONE):
if order_msg == str(zp.ORDER_PROTOCOL.DONE):
self.signal_done()
return

if order_msg == str(zp.ORDER_PROTOCOL.BREAK):
qutil.LOGGER.info("order loop finished")
break

Expand All @@ -115,21 +123,35 @@ def do_work(self):

self.last_iteration_duration = datetime.datetime.utcnow() - self.event_start
dt = self.simulation_dt + self.last_iteration_duration
order_event = zp.namedict({"sid":sid, "amount":amount, "dt":dt, source_id=self.get_id})
order_event = zp.namedict({"sid":sid, "amount":amount, "dt":dt, "source_id":self.get_id, "type":zp.DATASOURCE_TYPE.ORDER})

message = zp.DATASOURCE_FRAME(event)
self.data_socket.send(message)
self.send(order_event)
count += 1
self.sent_count += 1

#TODO: we have to send at least one dummy order per do_work iteration or the feed will block waiting for our messages.
if(count == 0):
self.send_dummy()
self.sent_count += 1

def send(self, order_event):
message = zp.DATASOURCE_FRAME(order_event)
self.data_socket.send(message)

def send_dummy(self):
dt = self.simulation_dt + self.last_iteration_duration
dummy_order = zp.namedict({"sid":0, "amount":0, "dt":dt, "source_id":self.get_id, "type":zp.DATASOURCE_TYPE.ORDER})
self.send(dummy_order)



class TransactionSimulator(qmsg.BaseTransform):

def __init__(self):
qmsg.BaseTransform.__init__(self, "TRANSACTION_SIM")
qmsg.BaseTransform.__init__(self, zp.TRANSFORM_TYPE.TRANSACTION)
self.open_orders = {}
self.order_count = 0
self.tradeWindow = datetime.timedelta(seconds=30)
self.trade_windwo = datetime.timedelta(seconds=30)
self.orderTTL = datetime.timedelta(days=1)
self.volume_share = 0.05
self.commission = 0.03
Expand All @@ -139,83 +161,85 @@ def transform(self, event):
Pulls one message from the event feed, then
loops on orders until client sends DONE message.
"""
if(event.type == "ORDER_SIM"):
self.add_open_order(event.sid, event.amount)
self.state['value'] = self.average
elif(event.type == "EQUITY_TRADE"):
txn = apply_trade_to_open_orders(event)
#TODO: need a way to send a placeholder txn, to avoid blocking merge... maybe customize merge to not block on txn?
if(event.type == zp.DATASOURCE_TYPE.ORDER):
self.add_open_order(event)
self.state['value'] = None
elif(event.type == zp.DATASOURCE_TYPE.TRADE):
txn = self.apply_trade_to_open_orders(event)
self.state['value'] = txn
else:
self.state['value'] = None
qutil.LOGGER.info("unexpected event type in transform: {etype}".format(etype=event.type))
#TODO: what to do if we get another kind of datasource event.type?

return self.state

def add_open_order(self, sid, amount):
def add_open_order(self, event):
"""Orders are captured in a buffer by sid. No calculations are done here.
Amount is explicitly converted to an int.
Orders of amount zero are ignored.
"""
amount = int(amount)
if amount == 0:
qutil.LOGGER.debug("{title}:{id} requested to trade zero shares of {sid}".format(sid=sid,
title=self.hostedAlgo.algo.title,
id=self.hostedAlgo.algo.id))
event.amount = int(event.amount)
if event.amount == 0:
qutil.LOGGER.debug("requested to trade zero shares of {sid}".format(sid=event.sid))
return

self.order_count += 1
order = zp.namedict({'sid' : sid,
'amount' : amount,
'dt' : self.algo_time},
'filled': 0,
'direction': math.fabs(amount) / amount)

if(not self.open_orders.has_key(sid)):
self.open_orders[sid] = []
self.open_orders[sid].append(order)
if(not self.open_orders.has_key(event.sid)):
self.open_orders[event.sid] = []
self.open_orders[event.sid].append(event)

def apply_trade_to_open_orders(self, event):

if(event.volume == 0):
#there are zero volume events bc some stocks trade less frequently than once per minute.
continue
return self.create_dummy_txn(event.dt)

if self.open_orders.has_key(event.sid):
orders = self.open_orders[event.sid]
remaining_orders = []
total_order = 0
dt = event.dt

for order in orders:
#we're using minute bars, so allow orders within 30 seconds of the trade
if((order.dt - event.dt) < self.tradeWindow):
total_order += order.amount
if(order.dt > dt):
dt = order.dt
#if the order still has time to live (TTL) keep track
elif((self.algo_time - order.dt) < self.orderTTL):
remaining_orders.append(order)

self.open_orders[event.sid] = remaining_orders

if(total_order != 0):
direction = total_order / math.fabs(total_order)
volume_share = (direction * total_order) / event.volume
if volume_share > .25:
volume_share = .25
amount = volume_share * event.volume * direction
impact = (volShare)**2 * .1 * direction * event.price
return self.create_transaction(event.sid, amount, event.price + impact, dt.replace(tzinfo = pytz.utc), direction)

else:
return None

remaining_orders = []
total_order = 0
dt = event.dt

for order in orders:
#we're using minute bars, so allow orders within 30 seconds of the trade
if((order.dt - event.dt) < self.trade_windwo):
total_order += order.amount
if(order.dt > dt):
dt = order.dt
#if the order still has time to live (TTL) keep track
elif((self.algo_time - order.dt) < self.orderTTL):
remaining_orders.append(order)

self.open_orders[event.sid] = remaining_orders

if(total_order != 0):
direction = total_order / math.fabs(total_order)
else:
direction = 1

volume_share = (direction * total_order) / event.volume
if volume_share > .25:
volume_share = .25
amount = volume_share * event.volume * direction
impact = (volume_share)**2 * .1 * direction * event.price
return self.create_transaction(event.sid, amount, event.price + impact, dt.replace(tzinfo = pytz.utc), direction)


def create_transaction(self, sid, amount, price, dt, direction):
if(amount != 0):
txn = {'sid' : sid,
'amount' : amount,
'dt' : dt,
'price' : price,
'back_test_run_id' : self.btRun.id,
'transaction_cost' : -1*(price * amount),
'commision' : self.commission * amount * direction}

return namedict(txn)
txn = {'sid' : sid,
'amount' : int(amount),
'dt' : dt,
'price' : price,
'commission' : self.commission * amount * direction,
'source_id' : zp.FINANCE_COMPONENT.TRANSACTION_SIM
}
return zp.namedict(txn)



Expand Down
4 changes: 2 additions & 2 deletions zipline/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ class PassthroughTransform(BaseTransform):

def __init__(self):
BaseTransform.__init__(self, "PASSTHROUGH")

self.init()

def init(self):
Expand All @@ -486,8 +485,9 @@ def init(self):
def get_type(self):
return COMPONENT_TYPE.CONDUIT

#TODO, could save some cycles by skipping the _UNFRAME call and just setting value to original msg string.
def transform(self, event):
return { 'value': event }
return {'name':zp.TRANSFORM_TYPE.PASSTHROUGH, 'value': zp.DATASOURCE_FRAME(event) }


class DataSource(Component):
Expand Down
Loading

0 comments on commit 6283906

Please sign in to comment.