This repository was archived by the owner on Sep 24, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtest_datastore.py
132 lines (104 loc) · 3.67 KB
/
test_datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""datastore tests."""
from __future__ import print_function
import json
import os
import pytest
import wandb
from wandb.internal import datastore
from wandb.proto import wandb_internal_pb2 # type: ignore
FNAME = "test.dat"
try:
FileNotFoundError
except NameError:
FileNotFoundError = OSError
def check(ds, chunk_sizes=tuple(), expected_records=0, expected_pad=0, expected_record_sizes=None):
"""Check datastore size after multiple items written."""
record_sizes = []
for _, chunk_size in enumerate(chunk_sizes):
size = ds._write_data(b'\x01' * chunk_size)
record_sizes.append(size)
num = 7 + sum(chunk_sizes) + expected_records * 7 + expected_pad
ds.close()
s = os.stat(FNAME)
assert s.st_size == num
if expected_record_sizes is not None:
assert tuple(record_sizes) == expected_record_sizes
@pytest.fixture()
def with_datastore(request):
"""Fixture which returns an initialized datastore."""
try:
os.unlink(FNAME)
except FileNotFoundError:
pass
wandb._set_internal_process()
s = datastore.DataStore()
s.open_for_write(FNAME)
def fin():
os.unlink(FNAME)
request.addfinalizer(fin)
return s
def test_proto_write_partial():
"""Serialize a proto into a partial block."""
data = dict(this=2, that=4)
history = wandb_internal_pb2.HistoryRecord()
for k, v in data.items():
json_data = json.dumps(v)
item = history.item.add()
item.key = k
item.value_json = json_data
rec = wandb_internal_pb2.Record()
rec.history.CopyFrom(history)
wandb._set_internal_process()
s = datastore.DataStore()
s.open_for_write(FNAME)
s.write(rec)
s.close()
def test_data_write_full(with_datastore):
"""Write a full block."""
sizes, records = tuple([32768 - 7 - 7]), 1
check(with_datastore, chunk_sizes=sizes, expected_records=records)
def test_data_write_overflow(with_datastore):
"""Write one more than we can fit in a block."""
ds = with_datastore
ds._write_data(b'\x01' * (32768 - 7 - 7 + 1))
ds.close()
s = os.stat(FNAME)
assert s.st_size == 32768 + 7 + 1
def test_data_write_pad(with_datastore):
"""Pad 6 bytes with zeros, then write next record."""
ds = with_datastore
ds._write_data(b'\x01' * (32768 - 7 - 7 - 6))
ds._write_data(b'\x02' * (1))
ds.close()
s = os.stat(FNAME)
assert s.st_size == 32768 + 7 + 1
def test_data_write_empty(with_datastore):
"""Write empty record with zero length, then write next record."""
ds = with_datastore
ds._write_data(b'\x01' * (32768 - 7 - 7 - 7))
ds._write_data(b'\x02' * (1))
ds.close()
s = os.stat(FNAME)
assert s.st_size == 32768 + 7 + 1
def test_data_write_split(with_datastore):
"""Leave just room for 1 more byte, then try to write 2."""
ds = with_datastore
ds._write_data(b'\x01' * (32768 - 7 - 7 - 8))
ds._write_data(b'\x02' * (2))
ds.close()
s = os.stat(FNAME)
assert s.st_size == 32768 + 7 + 1
def test_data_write_split_overflow(with_datastore):
"""Leave just room for 1 more byte, then write a block + 1 byte."""
ds = with_datastore
ds._write_data(b'\x01' * (32768 - 7 - 7 - 8))
ds._write_data(b'\x02' * (2 + 32768 - 7))
ds.close()
s = os.stat(FNAME)
assert s.st_size == 32768 * 2 + 7 + 1
def test_data_write_fill(with_datastore):
"""Leave just room for 1 more byte, then write a 1 byte, followed by another 1 byte."""
sizes = tuple([32768 - 7 - 7 - 8, 1, 1])
records = 3
lengths = (7, 32753, 0, 0), (32760, 8, 0, 0), (32768, 8, 0, 0)
check(with_datastore, chunk_sizes=sizes, expected_records=records, expected_record_sizes=lengths)