forked from gpt-engineer-org/gpt-engineer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_db.py
126 lines (85 loc) · 3.12 KB
/
test_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import pytest
from gpt_engineer.db import DB, DBs
def test_DB_operations(tmp_path):
# Test initialization
db = DB(tmp_path)
# Test __setitem__
db["test_key"] = "test_value"
assert (tmp_path / "test_key").is_file()
# Test __getitem__
val = db["test_key"]
assert val == "test_value"
# Test error on getting non-existent key
with pytest.raises(KeyError):
db["non_existent"]
# Test error on setting non-str or non-bytes value
with pytest.raises(TypeError):
db["key"] = ["Invalid", "value"]
def test_DBs_initialization(tmp_path):
dir_names = ["memory", "logs", "preprompts", "input", "workspace"]
directories = [tmp_path / name for name in dir_names]
# Create DB objects
dbs = [DB(dir) for dir in directories]
# Create DB instance
dbs_instance = DBs(*dbs)
assert isinstance(dbs_instance.memory, DB)
assert isinstance(dbs_instance.logs, DB)
assert isinstance(dbs_instance.preprompts, DB)
assert isinstance(dbs_instance.input, DB)
assert isinstance(dbs_instance.workspace, DB)
def test_invalid_path():
with pytest.raises((PermissionError, OSError)):
# Test with a path that will raise a permission error
DB("/root/test")
def test_large_files(tmp_path):
db = DB(tmp_path)
large_content = "a" * (10**6) # 1MB of data
# Test write large files
db["large_file"] = large_content
# Test read large files
assert db["large_file"] == large_content
def test_concurrent_access(tmp_path):
import threading
db = DB(tmp_path)
num_threads = 10
num_writes = 1000
def write_to_db(thread_id):
for i in range(num_writes):
key = f"thread{thread_id}_write{i}"
db[key] = str(i)
threads = []
for thread_id in range(num_threads):
t = threading.Thread(target=write_to_db, args=(thread_id,))
t.start()
threads.append(t)
for t in threads:
t.join()
# Verify that all expected data was written
for thread_id in range(num_threads):
for i in range(num_writes):
key = f"thread{thread_id}_write{i}"
assert key in db # using __contains__ now
assert db[key] == str(i)
def test_error_messages(tmp_path):
db = DB(tmp_path)
with pytest.raises(TypeError) as e:
db["key"] = ["Invalid", "value"]
assert str(e.value) == "val must be either a str or bytes"
def test_DBs_instantiation_with_wrong_number_of_arguments(tmp_path):
db = DB(tmp_path)
with pytest.raises(TypeError):
DBs(db, db, db)
with pytest.raises(TypeError):
DBs(db, db, db, db, db, db)
def test_DBs_dataclass_attributes(tmp_path):
dir_names = ["memory", "logs", "preprompts", "input", "workspace"]
directories = [tmp_path / name for name in dir_names]
# Create DB objects
dbs = [DB(dir) for dir in directories]
# Create DBs instance
dbs_instance = DBs(*dbs)
assert dbs_instance.memory == dbs[0]
assert dbs_instance.logs == dbs[1]
assert dbs_instance.preprompts == dbs[2]
assert dbs_instance.input == dbs[3]
assert dbs_instance.workspace == dbs[4]