-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBinaryWriter.py
404 lines (322 loc) · 11.6 KB
/
BinaryWriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# -*- coding:utf-8 -*-
"""
Description:
Binary Writer
Usage:
from neocore.IO.BinaryWriter import BinaryWriter
"""
import sys
import os
import inspect
import struct
import binascii
class BinaryWriter(object):
"""docstring for BinaryWriter"""
def __init__(self, stream):
"""
Create an instance.
Args:
stream (BytesIO): a stream to operate on. i.e. a neo.IO.MemoryStream or raw BytesIO.
"""
super(BinaryWriter, self).__init__()
self.stream = stream
def WriteByte(self, value):
"""
Write a single byte to the stream.
Args:
value (bytes, str or int): value to write to the stream.
"""
if type(value) is bytes:
self.stream.write(value)
elif type(value) is str:
self.stream.write(value.encode('utf-8'))
elif type(value) is int:
self.stream.write(bytes([value]))
def WriteBytes(self, value, unhex=True):
"""
Write a `bytes` type to the stream.
Args:
value (bytes): array of bytes to write to the stream.
unhex (bool): (Default) True. Set to unhexlify the stream. Use when the bytes are not raw bytes; i.e. b'aabb'
Returns:
int: the number of bytes written.
"""
if unhex:
try:
value = binascii.unhexlify(value)
except binascii.Error:
pass
return self.stream.write(value)
def pack(self, fmt, data):
"""
Write bytes by packing them according to the provided format `fmt`.
For more information about the `fmt` format see: https://docs.python.org/3/library/struct.html
Args:
fmt (str): format string.
data (object): the data to write to the raw stream.
Returns:
int: the number of bytes written.
"""
return self.WriteBytes(struct.pack(fmt, data), unhex=False)
def WriteChar(self, value):
"""
Write a 1 byte character value to the stream.
Args:
value: value to write.
Returns:
int: the number of bytes written.
"""
return self.pack('c', value)
def WriteFloat(self, value, endian="<"):
"""
Pack the value as a float and write 4 bytes to the stream.
Args:
value (number): the value to write to the stream.
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sf' % endian, value)
def WriteDouble(self, value, endian="<"):
"""
Pack the value as a double and write 8 bytes to the stream.
Args:
value (number): the value to write to the stream.
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sd' % endian, value)
def WriteInt8(self, value, endian="<"):
"""
Pack the value as a signed byte and write 1 byte to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sb' % endian, value)
def WriteUInt8(self, value, endian="<"):
"""
Pack the value as an unsigned byte and write 1 byte to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sB' % endian, value)
def WriteBool(self, value):
"""
Pack the value as a bool and write 1 byte to the stream.
Args:
value: the boolean value to write.
Returns:
int: the number of bytes written.
"""
return self.pack('?', value)
def WriteInt16(self, value, endian="<"):
"""
Pack the value as a signed integer and write 2 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sh' % endian, value)
def WriteUInt16(self, value, endian="<"):
"""
Pack the value as an unsigned integer and write 2 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sH' % endian, value)
def WriteInt32(self, value, endian="<"):
"""
Pack the value as a signed integer and write 4 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%si' % endian, value)
def WriteUInt32(self, value, endian="<"):
"""
Pack the value as an unsigned integer and write 4 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sI' % endian, value)
def WriteInt64(self, value, endian="<"):
"""
Pack the value as a signed integer and write 8 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sq' % endian, value)
def WriteUInt64(self, value, endian="<"):
"""
Pack the value as an unsigned integer and write 8 bytes to the stream.
Args:
value:
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
return self.pack('%sQ' % endian, value)
def WriteUInt160(self, value):
"""
Write a UInt160 type to the stream.
Args:
value (UInt160):
Raises:
Exception: when `value` is not of neocore.UInt160 type.
"""
if type(value) is UInt160:
value.Serialize(self)
else:
raise Exception("value must be UInt160 instance ")
def WriteUInt256(self, value):
"""
Write a UInt256 type to the stream.
Args:
value (UInt256):
Raises:
Exception: when `value` is not of neocore.UInt256 type.
"""
if type(value) is UInt256:
value.Serialize(self)
else:
raise Exception("Cannot write value that is not UInt256")
def WriteVarInt(self, value, endian="<"):
"""
Write an integer value in a space saving way to the stream.
Read more about variable size encoding here: http://docs.neo.org/en-us/node/network-protocol.html#convention
Args:
value (int):
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Raises:
TypeError: if `value` is not of type int.
Exception: if `value` is < 0.
Returns:
int: the number of bytes written.
"""
if not isinstance(value, int):
raise TypeError('%s not int type.' % value)
if value < 0:
raise Exception('%d too small.' % value)
elif value < 0xfd:
return self.WriteByte(value)
elif value <= 0xffff:
self.WriteByte(0xfd)
return self.WriteUInt16(value, endian)
elif value <= 0xFFFFFFFF:
self.WriteByte(0xfe)
return self.WriteUInt32(value, endian)
else:
self.WriteByte(0xff)
return self.WriteUInt64(value, endian)
def WriteVarBytes(self, value, endian="<"):
"""
Write an integer value in a space saving way to the stream.
Read more about variable size encoding here: http://docs.neo.org/en-us/node/network-protocol.html#convention
Args:
value (bytes):
endian (str): specify the endianness. (Default) Little endian ('<'). Use '>' for big endian.
Returns:
int: the number of bytes written.
"""
length = len(value)
self.WriteVarInt(length, endian)
return self.WriteBytes(value, unhex=False)
def WriteVarString(self, value, encoding="utf-8"):
"""
Write a string value to the stream.
Read more about variable size encoding here: http://docs.neo.org/en-us/node/network-protocol.html#convention
Args:
value (string): value to write to the stream.
encoding (str): string encoding format.
"""
if type(value) is str:
value = value.encode(encoding)
length = len(value)
ba = bytearray(value)
byts = binascii.hexlify(ba)
string = byts.decode(encoding)
self.WriteVarInt(length)
self.WriteBytes(string)
def WriteFixedString(self, value, length):
"""
Write a string value to the stream.
Args:
value (str): value to write to the stream.
length (int): length of the string to write.
"""
towrite = value.encode('utf-8')
slen = len(towrite)
if slen > length:
raise Exception("string longer than fixed length: %s " % length)
self.WriteBytes(towrite)
diff = length - slen
while diff > 0:
self.WriteByte(0)
diff -= 1
def WriteSerializableArray(self, array):
"""
Write an array of serializable objects to the stream.
Args:
array(list): a list of serializable objects. i.e. extending neo.IO.Mixins.SerializableMixin
"""
if array is None:
self.WriteByte(0)
else:
self.WriteVarInt(len(array))
for item in array:
item.Serialize(self)
def Write2000256List(self, arr):
"""
Write an array of 64 byte items to the stream.
Args:
arr (list): a list of 2000 items of 64 bytes in size.
"""
for item in arr:
ba = bytearray(binascii.unhexlify(item))
ba.reverse()
self.WriteBytes(ba)
def WriteHashes(self, arr):
"""
Write an array of hashes to the stream.
Args:
arr (list): a list of 32 byte hashes.
"""
length = len(arr)
self.WriteVarInt(length)
for item in arr:
ba = bytearray(binascii.unhexlify(item))
ba.reverse()
# logger.info("WRITING HASH %s " % ba)
self.WriteBytes(ba)
def WriteFixed8(self, value, unsigned=False):
"""
Write a Fixed8 value to the stream.
Args:
value (neo.Fixed8):
unsigned: (Not used)
Returns:
int: the number of bytes written
"""
# if unsigned:
# return self.WriteUInt64(int(value.value))
return self.WriteInt64(value.value)