Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

changes in stream parsing #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 87 additions & 56 deletions thetadata/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
from tqdm import tqdm
import pandas as pd

from . import terminal
from .enums import *
from .parsing import (
import terminal
from enums import *
from parsing import (
Header,
TickBody,
ListBody,
parse_list_REST, parse_flexible_REST, parse_hist_REST, parse_hist_REST_stream, parse_hist_REST_stream_ijson,
)
from .terminal import check_download, launch_terminal
from terminal import check_download, launch_terminal

_NOT_CONNECTED_MSG = "You must establish a connection first."
_VERSION = '0.9.11'
URL_BASE = "http://127.0.0.1:25510/"

def parse_int(d):
return int.from_bytes(d, "big")

def _format_strike(strike: float) -> int:
"""Round USD to the nearest tenth of a cent, acceptable by the terminal."""
Expand Down Expand Up @@ -84,10 +86,8 @@ def __init__(self):
self.exchange = None
self.date = None

def from_bytes(self, data: bytearray):
def from_bytes(self, view: bytearray):
"""Deserializes a trade."""
view = memoryview(data)
parse_int = lambda d: int.from_bytes(d, "big")
self.ms_of_day = parse_int(view[0:4])
self.sequence = parse_int(view[4:8]) & 0xffffffffffffffff
self.size = parse_int(view[8:12])
Expand Down Expand Up @@ -126,10 +126,8 @@ def __init__(self):
self.count = 0
self.date = None

def from_bytes(self, data: bytearray):
def from_bytes(self, view: memoryview):
"""Deserializes a trade."""
view = memoryview(data)
parse_int = lambda d: int.from_bytes(d, "big")
self.ms_of_day = parse_int(view[0:4])
self.open = round(parse_int(view[4:8]) * _pt_to_price_mul[parse_int(view[28:32])], 4)
self.high = round(parse_int(view[8:12]) * _pt_to_price_mul[parse_int(view[28:32])], 4)
Expand Down Expand Up @@ -172,10 +170,9 @@ def __init__(self):
self.ask_condition = QuoteCondition.UNDEFINED
self.date = None

def from_bytes(self, data: bytes):
def from_bytes(self, data: memoryview):
"""Deserializes a trade."""
view = memoryview(data)
parse_int = lambda d: int.from_bytes(d, "big")
view = data
mult = _pt_to_price_mul[parse_int(view[36:40])]
self.ms_of_day = parse_int(view[0:4])
self.bid_size = parse_int(view[4:8])
Expand Down Expand Up @@ -220,7 +217,6 @@ def __init__(self):
def from_bytes(self, data: bytearray):
"""Deserializes open interest."""
view = memoryview(data)
parse_int = lambda d: int.from_bytes(d, "big")
self.open_interest = parse_int(view[0:4])
date_raw = str(parse_int(view[4:8]))
self.date = date(year=int(date_raw[0:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8]))
Expand All @@ -244,16 +240,14 @@ def __init__(self):
self.isCall = False
self.isOption = False

def from_bytes(self, data: bytes):
def from_bytes(self, data: memoryview):
"""Deserializes a contract."""
view = memoryview(data)
parse_int = lambda d: int.from_bytes(d, "big")
view = data
# parse
len = parse_int(view[:1])
root_len = parse_int(view[1:2])
self.root = data[2:2 + root_len].decode("ascii")
self.root = data[2:2 + root_len].tobytes().decode("ascii")

opt = parse_int(data[root_len + 2: root_len + 3])
opt = parse_int(data[root_len + 2: root_len + 3].tobytes())
self.isOption = opt == 1
if not self.isOption:
return
Expand Down Expand Up @@ -321,6 +315,8 @@ def __init__(self, port: int = 11000, timeout: Optional[float] = 60, launch: boo
self._stream_req_id = 0
self._stream_connected = False

self.buffer_size = 4096

print('If you require API support, feel free to join our discord server! http://discord.thetadata.us')
if launch:
terminal.kill_existing_terminal()
Expand Down Expand Up @@ -535,45 +531,80 @@ def _recv_stream(self):
"""
msg = StreamMsg()
msg.client = self
parse_int = lambda d: int.from_bytes(d, "big")
self._stream_server.settimeout(10)

_leftover = bytearray()

while self._stream_connected:
try:
msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
msg.contract.from_bytes(self._read_stream(parse_int(self._read_stream(1)[:1])))
if msg.type == StreamMsgType.QUOTE:
msg.quote.from_bytes(self._read_stream(44))
elif msg.type == StreamMsgType.TRADE:
data = self._read_stream(n_bytes=32)
msg.trade.from_bytes(data)
elif msg.type == StreamMsgType.OHLCVC:
data = self._read_stream(n_bytes=36)
msg.ohlcvc.from_bytes(data)
elif msg.type == StreamMsgType.PING:
self._read_stream(n_bytes=4)
elif msg.type == StreamMsgType.OPEN_INTEREST:
data = self._read_stream(n_bytes=8)
msg.open_interest.from_bytes(data)
elif msg.type == StreamMsgType.REQ_RESPONSE:
msg.req_response_id = parse_int(self._read_stream(4))
msg.req_response = StreamResponseType.from_code(parse_int(self._read_stream(4)))
self._stream_responses[msg.req_response_id] = msg.req_response
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date()
elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED:
self._read_stream(4) # Future use.

byte_buffer = self._read_stream(self.buffer_size)

if len(_leftover) > 0:
byte_buffer = _leftover + byte_buffer

_remainder = len(byte_buffer)
byte_buffer = memoryview(byte_buffer)

bidx = 0
_bskip = 0
bjump = 0
while _bskip < len(byte_buffer):

if _remainder > 80:
try:
bidx = _bskip
msg.type = StreamMsgType.from_code(parse_int(byte_buffer[bidx:bidx+1]))
bidx += 1
bjump = parse_int(byte_buffer[bidx:bidx+1])
bidx += 1
msg.contract.from_bytes(byte_buffer[bidx:bidx + bjump])
_bskip += bjump+2
if msg.type == StreamMsgType.QUOTE:
msg.quote.from_bytes(byte_buffer[_bskip:_bskip+44])
bjump = 44
elif msg.type == StreamMsgType.TRADE:
msg.trade.from_bytes(byte_buffer[_bskip:_bskip+32])
bjump = 32
elif msg.type == StreamMsgType.OHLCVC:
msg.ohlcvc.from_bytes(byte_buffer[_bskip:_bskip+36])
bjump = 36
elif msg.type == StreamMsgType.PING:
bjump = 4
elif msg.type == StreamMsgType.OPEN_INTEREST:
msg.open_interest.from_bytes(byte_buffer[_bskip:_bskip+8])
bjump = 8
elif msg.type == StreamMsgType.REQ_RESPONSE:
msg.req_response_id = parse_int(byte_buffer[_bskip:_bskip+4])
msg.req_response = StreamResponseType.from_code(parse_int(byte_buffer[_bskip+4:_bskip+8]))
bjump = 8
self._stream_responses[msg.req_response_id] = msg.req_response
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
msg.date = datetime.strptime(str(parse_int(byte_buffer[_bskip:_bskip+4])), "%Y%m%d").date()
bjump = 4
elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED:
bjump = 4
else:
bjump = 1
raise ValueError('undefined msg type: ' + str(msg.type))

_bskip += bjump
except (ConnectionResetError, OSError) as e:
msg.type = StreamMsgType.STREAM_DEAD
self._stream_impl(msg)
self._stream_connected = False
return
except Exception as e:
msg.type = StreamMsgType.ERROR
print('Stream error for contract: ' + msg.contract.to_string())
traceback.print_exc()
_bskip += 1 #If exception, move 1 byte ahead and re-try
else:
self._stream_impl(msg)

_remainder = len(byte_buffer)-_bskip
else:
raise ValueError('undefined msg type: ' + str(msg.type))
except (ConnectionResetError, OSError) as e:
msg.type = StreamMsgType.STREAM_DEAD
self._stream_impl(msg)
self._stream_connected = False
return
except Exception as e:
msg.type = StreamMsgType.ERROR
print('Stream error for contract: ' + msg.contract.to_string())
traceback.print_exc()
self._stream_impl(msg)
_leftover = bytearray(byte_buffer[len(byte_buffer)-_remainder:len(byte_buffer)].tobytes()).copy()
break

def _read_stream(self, n_bytes: int) -> bytearray:
"""from_bytes
Expand Down