From dcbfb8a8f83ed444f5caa628729565ef665ba33c Mon Sep 17 00:00:00 2001 From: n0sskrm Date: Tue, 24 Oct 2023 12:30:10 +0200 Subject: [PATCH] changes in stream parsing --- thetadata/client.py | 143 +++++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 56 deletions(-) diff --git a/thetadata/client.py b/thetadata/client.py index 62c7e3c..ed4a149 100644 --- a/thetadata/client.py +++ b/thetadata/client.py @@ -16,20 +16,22 @@ from tqdm import tqdm import pandas as pd -from . import terminal -from .enums import * -from .parsing import ( +import terminal +from enums import * +from parsing import ( Header, TickBody, ListBody, parse_list_REST, parse_flexible_REST, parse_hist_REST, parse_hist_REST_stream, parse_hist_REST_stream_ijson, ) -from .terminal import check_download, launch_terminal +from terminal import check_download, launch_terminal _NOT_CONNECTED_MSG = "You must establish a connection first." _VERSION = '0.9.11' URL_BASE = "http://127.0.0.1:25510/" +def parse_int(d): + return int.from_bytes(d, "big") def _format_strike(strike: float) -> int: """Round USD to the nearest tenth of a cent, acceptable by the terminal.""" @@ -84,10 +86,8 @@ def __init__(self): self.exchange = None self.date = None - def from_bytes(self, data: bytearray): + def from_bytes(self, view: bytearray): """Deserializes a trade.""" - view = memoryview(data) - parse_int = lambda d: int.from_bytes(d, "big") self.ms_of_day = parse_int(view[0:4]) self.sequence = parse_int(view[4:8]) & 0xffffffffffffffff self.size = parse_int(view[8:12]) @@ -126,10 +126,8 @@ def __init__(self): self.count = 0 self.date = None - def from_bytes(self, data: bytearray): + def from_bytes(self, view: memoryview): """Deserializes a trade.""" - view = memoryview(data) - parse_int = lambda d: int.from_bytes(d, "big") self.ms_of_day = parse_int(view[0:4]) self.open = round(parse_int(view[4:8]) * _pt_to_price_mul[parse_int(view[28:32])], 4) self.high = round(parse_int(view[8:12]) * _pt_to_price_mul[parse_int(view[28:32])], 4) @@ -172,10 +170,9 @@ def __init__(self): self.ask_condition = QuoteCondition.UNDEFINED self.date = None - def from_bytes(self, data: bytes): + def from_bytes(self, data: memoryview): """Deserializes a trade.""" - view = memoryview(data) - parse_int = lambda d: int.from_bytes(d, "big") + view = data mult = _pt_to_price_mul[parse_int(view[36:40])] self.ms_of_day = parse_int(view[0:4]) self.bid_size = parse_int(view[4:8]) @@ -220,7 +217,6 @@ def __init__(self): def from_bytes(self, data: bytearray): """Deserializes open interest.""" view = memoryview(data) - parse_int = lambda d: int.from_bytes(d, "big") self.open_interest = parse_int(view[0:4]) date_raw = str(parse_int(view[4:8])) self.date = date(year=int(date_raw[0:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8])) @@ -244,16 +240,14 @@ def __init__(self): self.isCall = False self.isOption = False - def from_bytes(self, data: bytes): + def from_bytes(self, data: memoryview): """Deserializes a contract.""" - view = memoryview(data) - parse_int = lambda d: int.from_bytes(d, "big") + view = data # parse - len = parse_int(view[:1]) root_len = parse_int(view[1:2]) - self.root = data[2:2 + root_len].decode("ascii") + self.root = data[2:2 + root_len].tobytes().decode("ascii") - opt = parse_int(data[root_len + 2: root_len + 3]) + opt = parse_int(data[root_len + 2: root_len + 3].tobytes()) self.isOption = opt == 1 if not self.isOption: return @@ -321,6 +315,8 @@ def __init__(self, port: int = 11000, timeout: Optional[float] = 60, launch: boo self._stream_req_id = 0 self._stream_connected = False + self.buffer_size = 4096 + print('If you require API support, feel free to join our discord server! http://discord.thetadata.us') if launch: terminal.kill_existing_terminal() @@ -535,45 +531,80 @@ def _recv_stream(self): """ msg = StreamMsg() msg.client = self - parse_int = lambda d: int.from_bytes(d, "big") self._stream_server.settimeout(10) + + _leftover = bytearray() + while self._stream_connected: - try: - msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1])) - msg.contract.from_bytes(self._read_stream(parse_int(self._read_stream(1)[:1]))) - if msg.type == StreamMsgType.QUOTE: - msg.quote.from_bytes(self._read_stream(44)) - elif msg.type == StreamMsgType.TRADE: - data = self._read_stream(n_bytes=32) - msg.trade.from_bytes(data) - elif msg.type == StreamMsgType.OHLCVC: - data = self._read_stream(n_bytes=36) - msg.ohlcvc.from_bytes(data) - elif msg.type == StreamMsgType.PING: - self._read_stream(n_bytes=4) - elif msg.type == StreamMsgType.OPEN_INTEREST: - data = self._read_stream(n_bytes=8) - msg.open_interest.from_bytes(data) - elif msg.type == StreamMsgType.REQ_RESPONSE: - msg.req_response_id = parse_int(self._read_stream(4)) - msg.req_response = StreamResponseType.from_code(parse_int(self._read_stream(4))) - self._stream_responses[msg.req_response_id] = msg.req_response - elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START: - msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date() - elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED: - self._read_stream(4) # Future use. + + byte_buffer = self._read_stream(self.buffer_size) + + if len(_leftover) > 0: + byte_buffer = _leftover + byte_buffer + + _remainder = len(byte_buffer) + byte_buffer = memoryview(byte_buffer) + + bidx = 0 + _bskip = 0 + bjump = 0 + while _bskip < len(byte_buffer): + + if _remainder > 80: + try: + bidx = _bskip + msg.type = StreamMsgType.from_code(parse_int(byte_buffer[bidx:bidx+1])) + bidx += 1 + bjump = parse_int(byte_buffer[bidx:bidx+1]) + bidx += 1 + msg.contract.from_bytes(byte_buffer[bidx:bidx + bjump]) + _bskip += bjump+2 + if msg.type == StreamMsgType.QUOTE: + msg.quote.from_bytes(byte_buffer[_bskip:_bskip+44]) + bjump = 44 + elif msg.type == StreamMsgType.TRADE: + msg.trade.from_bytes(byte_buffer[_bskip:_bskip+32]) + bjump = 32 + elif msg.type == StreamMsgType.OHLCVC: + msg.ohlcvc.from_bytes(byte_buffer[_bskip:_bskip+36]) + bjump = 36 + elif msg.type == StreamMsgType.PING: + bjump = 4 + elif msg.type == StreamMsgType.OPEN_INTEREST: + msg.open_interest.from_bytes(byte_buffer[_bskip:_bskip+8]) + bjump = 8 + elif msg.type == StreamMsgType.REQ_RESPONSE: + msg.req_response_id = parse_int(byte_buffer[_bskip:_bskip+4]) + msg.req_response = StreamResponseType.from_code(parse_int(byte_buffer[_bskip+4:_bskip+8])) + bjump = 8 + self._stream_responses[msg.req_response_id] = msg.req_response + elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START: + msg.date = datetime.strptime(str(parse_int(byte_buffer[_bskip:_bskip+4])), "%Y%m%d").date() + bjump = 4 + elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED: + bjump = 4 + else: + bjump = 1 + raise ValueError('undefined msg type: ' + str(msg.type)) + + _bskip += bjump + except (ConnectionResetError, OSError) as e: + msg.type = StreamMsgType.STREAM_DEAD + self._stream_impl(msg) + self._stream_connected = False + return + except Exception as e: + msg.type = StreamMsgType.ERROR + print('Stream error for contract: ' + msg.contract.to_string()) + traceback.print_exc() + _bskip += 1 #If exception, move 1 byte ahead and re-try + else: + self._stream_impl(msg) + + _remainder = len(byte_buffer)-_bskip else: - raise ValueError('undefined msg type: ' + str(msg.type)) - except (ConnectionResetError, OSError) as e: - msg.type = StreamMsgType.STREAM_DEAD - self._stream_impl(msg) - self._stream_connected = False - return - except Exception as e: - msg.type = StreamMsgType.ERROR - print('Stream error for contract: ' + msg.contract.to_string()) - traceback.print_exc() - self._stream_impl(msg) + _leftover = bytearray(byte_buffer[len(byte_buffer)-_remainder:len(byte_buffer)].tobytes()).copy() + break def _read_stream(self, n_bytes: int) -> bytearray: """from_bytes