forked from HelloZeroNet/ZeroNet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTestPeer.py
159 lines (119 loc) · 6.22 KB
/
TestPeer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import time
import io
import pytest
from File import FileServer
from File import FileRequest
from Crypt import CryptHash
from . import Spy
@pytest.mark.usefixtures("resetSettings")
@pytest.mark.usefixtures("resetTempSettings")
class TestPeer:
def testPing(self, file_server, site, site_temp):
file_server.sites[site.address] = site
client = FileServer(file_server.ip, 1545)
client.sites = {site_temp.address: site_temp}
site_temp.connection_server = client
connection = client.getConnection(file_server.ip, 1544)
# Add file_server as peer to client
peer_file_server = site_temp.addPeer(file_server.ip, 1544)
assert peer_file_server.ping() is not None
assert peer_file_server in site_temp.peers.values()
peer_file_server.remove()
assert peer_file_server not in site_temp.peers.values()
connection.close()
client.stop()
def testDownloadFile(self, file_server, site, site_temp):
file_server.sites[site.address] = site
client = FileServer(file_server.ip, 1545)
client.sites = {site_temp.address: site_temp}
site_temp.connection_server = client
connection = client.getConnection(file_server.ip, 1544)
# Add file_server as peer to client
peer_file_server = site_temp.addPeer(file_server.ip, 1544)
# Testing streamFile
buff = peer_file_server.getFile(site_temp.address, "content.json", streaming=True)
assert b"sign" in buff.getvalue()
# Testing getFile
buff = peer_file_server.getFile(site_temp.address, "content.json")
assert b"sign" in buff.getvalue()
connection.close()
client.stop()
def testHashfield(self, site):
sample_hash = list(site.content_manager.contents["content.json"]["files_optional"].values())[0]["sha512"]
site.storage.verifyFiles(quick_check=True) # Find what optional files we have
# Check if hashfield has any files
assert site.content_manager.hashfield
assert len(site.content_manager.hashfield) > 0
# Check exsist hash
assert site.content_manager.hashfield.getHashId(sample_hash) in site.content_manager.hashfield
# Add new hash
new_hash = CryptHash.sha512sum(io.BytesIO(b"hello"))
assert site.content_manager.hashfield.getHashId(new_hash) not in site.content_manager.hashfield
assert site.content_manager.hashfield.appendHash(new_hash)
assert not site.content_manager.hashfield.appendHash(new_hash) # Don't add second time
assert site.content_manager.hashfield.getHashId(new_hash) in site.content_manager.hashfield
# Remove new hash
assert site.content_manager.hashfield.removeHash(new_hash)
assert site.content_manager.hashfield.getHashId(new_hash) not in site.content_manager.hashfield
def testHashfieldExchange(self, file_server, site, site_temp):
server1 = file_server
server1.sites[site.address] = site
site.connection_server = server1
server2 = FileServer(file_server.ip, 1545)
server2.sites[site_temp.address] = site_temp
site_temp.connection_server = server2
site.storage.verifyFiles(quick_check=True) # Find what optional files we have
# Add file_server as peer to client
server2_peer1 = site_temp.addPeer(file_server.ip, 1544)
# Check if hashfield has any files
assert len(site.content_manager.hashfield) > 0
# Testing hashfield sync
assert len(server2_peer1.hashfield) == 0
assert server2_peer1.updateHashfield() # Query hashfield from peer
assert len(server2_peer1.hashfield) > 0
# Test force push new hashfield
site_temp.content_manager.hashfield.appendHash("AABB")
server1_peer2 = site.addPeer(file_server.ip, 1545, return_peer=True)
with Spy.Spy(FileRequest, "route") as requests:
assert len(server1_peer2.hashfield) == 0
server2_peer1.sendMyHashfield()
assert len(server1_peer2.hashfield) == 1
server2_peer1.sendMyHashfield() # Hashfield not changed, should be ignored
assert len(requests) == 1
time.sleep(0.01) # To make hashfield change date different
site_temp.content_manager.hashfield.appendHash("AACC")
server2_peer1.sendMyHashfield() # Push hashfield
assert len(server1_peer2.hashfield) == 2
assert len(requests) == 2
site_temp.content_manager.hashfield.appendHash("AADD")
assert server1_peer2.updateHashfield(force=True) # Request hashfield
assert len(server1_peer2.hashfield) == 3
assert len(requests) == 3
assert not server2_peer1.sendMyHashfield() # Not changed, should be ignored
assert len(requests) == 3
server2.stop()
def testFindHash(self, file_server, site, site_temp):
file_server.sites[site.address] = site
client = FileServer(file_server.ip, 1545)
client.sites = {site_temp.address: site_temp}
site_temp.connection_server = client
# Add file_server as peer to client
peer_file_server = site_temp.addPeer(file_server.ip, 1544)
assert peer_file_server.findHashIds([1234]) == {}
# Add fake peer with requred hash
fake_peer_1 = site.addPeer(file_server.ip_external, 1544)
fake_peer_1.hashfield.append(1234)
fake_peer_2 = site.addPeer("1.2.3.5", 1545)
fake_peer_2.hashfield.append(1234)
fake_peer_2.hashfield.append(1235)
fake_peer_3 = site.addPeer("1.2.3.6", 1546)
fake_peer_3.hashfield.append(1235)
fake_peer_3.hashfield.append(1236)
res = peer_file_server.findHashIds([1234, 1235])
assert sorted(res[1234]) == sorted([(file_server.ip_external, 1544), ("1.2.3.5", 1545)])
assert sorted(res[1235]) == sorted([("1.2.3.5", 1545), ("1.2.3.6", 1546)])
# Test my address adding
site.content_manager.hashfield.append(1234)
res = peer_file_server.findHashIds([1234, 1235])
assert sorted(res[1234]) == sorted([(file_server.ip_external, 1544), ("1.2.3.5", 1545), (file_server.ip, 1544)])
assert sorted(res[1235]) == sorted([("1.2.3.5", 1545), ("1.2.3.6", 1546)])