forked from Chia-Network/chia-blockchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_plot_sync.py
617 lines (557 loc) · 29.8 KB
/
test_plot_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
from __future__ import annotations
import asyncio
import contextlib
import functools
from dataclasses import dataclass, field, replace
from pathlib import Path
from shutil import copy
from typing import Any, AsyncIterator, Callable, List, Optional, Tuple
import pytest
from chia_rs import G1Element
from chia.farmer.farmer import Farmer
from chia.harvester.harvester import Harvester
from chia.plot_sync.delta import Delta, PathListDelta, PlotListDelta
from chia.plot_sync.receiver import Receiver
from chia.plot_sync.sender import Sender
from chia.plot_sync.util import Constants, State
from chia.plotting.manager import PlotManager
from chia.plotting.util import add_plot_directory, remove_plot_directory
from chia.protocols.harvester_protocol import Plot
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.simulator.block_tools import BlockTools
from chia.types.aliases import FarmerService, HarvesterService
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.config import create_default_chia_config, lock_and_load_config, save_config
from chia.util.ints import uint8, uint32, uint64
from chia.util.misc import SplitAsyncManager, split_async_manager
from chia.util.streamable import _T_Streamable
from tests.plot_sync.util import start_harvester_service
from tests.plotting.test_plot_manager import Directory, MockPlotInfo
from tests.plotting.util import get_test_plots
from tests.util.time_out_assert import time_out_assert
def synced(sender: Sender, receiver: Receiver, previous_last_sync_id: int) -> bool:
return (
sender._last_sync_id != previous_last_sync_id
and sender._last_sync_id == receiver._last_sync.sync_id != 0
and receiver.current_sync().state == State.idle
and not sender.sync_active()
)
def assert_path_list_matches(expected_list: List[str], actual_list: List[str]) -> None:
assert len(expected_list) == len(actual_list)
for item in expected_list:
assert str(item) in actual_list
@dataclass
class ExpectedResult:
valid_count: int = 0
valid_delta: PlotListDelta = field(default_factory=PlotListDelta)
invalid_count: int = 0
invalid_delta: PathListDelta = field(default_factory=PathListDelta)
keys_missing_count: int = 0
keys_missing_delta: PathListDelta = field(default_factory=PathListDelta)
duplicates_count: int = 0
duplicates_delta: PathListDelta = field(default_factory=PathListDelta)
callback_passed: bool = False
def add_valid(self, list_plots: List[MockPlotInfo]) -> None:
def create_mock_plot(info: MockPlotInfo) -> Plot:
return Plot(
info.prover.get_filename(),
uint8(0),
bytes32(b"\x00" * 32),
None,
None,
G1Element(),
uint64(0),
uint64(0),
uint8(0),
)
self.valid_count += len(list_plots)
self.valid_delta.additions.update({x.prover.get_filename(): create_mock_plot(x) for x in list_plots})
def remove_valid(self, list_paths: List[Path]) -> None:
self.valid_count -= len(list_paths)
self.valid_delta.removals += [str(x) for x in list_paths]
def add_invalid(self, list_paths: List[Path]) -> None:
self.invalid_count += len(list_paths)
self.invalid_delta.additions += [str(x) for x in list_paths]
def remove_invalid(self, list_paths: List[Path]) -> None:
self.invalid_count -= len(list_paths)
self.invalid_delta.removals += [str(x) for x in list_paths]
def add_keys_missing(self, list_paths: List[Path]) -> None:
self.keys_missing_count += len(list_paths)
self.keys_missing_delta.additions += [str(x) for x in list_paths]
def remove_keys_missing(self, list_paths: List[Path]) -> None:
self.keys_missing_count -= len(list_paths)
self.keys_missing_delta.removals += [str(x) for x in list_paths]
def add_duplicates(self, list_paths: List[Path]) -> None:
self.duplicates_count += len(list_paths)
self.duplicates_delta.additions += [str(x) for x in list_paths]
def remove_duplicates(self, list_paths: List[Path]) -> None:
self.duplicates_count -= len(list_paths)
self.duplicates_delta.removals += [str(x) for x in list_paths]
@dataclass
class Environment:
root_path: Path
harvester_services: List[HarvesterService]
farmer_service: FarmerService
harvesters: List[Harvester]
farmer: Farmer
dir_1: Directory
dir_2: Directory
dir_3: Directory
dir_4: Directory
dir_invalid: Directory
dir_keys_missing: Directory
dir_duplicates: Directory
expected: List[ExpectedResult]
split_farmer_service_manager: SplitAsyncManager[FarmerService]
split_harvester_managers: List[SplitAsyncManager[Harvester]]
def get_harvester(self, peer_id: bytes32) -> Optional[Harvester]:
for harvester in self.harvesters:
assert harvester.server is not None
if harvester.server.node_id == peer_id:
return harvester
return None
def add_directory(self, harvester_index: int, directory: Directory, state: State = State.loaded) -> None:
try:
add_plot_directory(self.harvesters[harvester_index].root_path, str(directory.path))
except ValueError:
pass
if state == State.loaded:
self.expected[harvester_index].add_valid(directory.plot_info_list())
elif state == State.invalid:
self.expected[harvester_index].add_invalid(directory.path_list())
elif state == State.keys_missing:
self.expected[harvester_index].add_keys_missing(directory.path_list())
elif state == State.duplicates:
self.expected[harvester_index].add_duplicates(directory.path_list())
else:
assert False, "Invalid state"
def remove_directory(self, harvester_index: int, directory: Directory, state: State = State.removed) -> None:
remove_plot_directory(self.harvesters[harvester_index].root_path, str(directory.path))
if state == State.removed:
self.expected[harvester_index].remove_valid(directory.path_list())
elif state == State.invalid:
self.expected[harvester_index].remove_invalid(directory.path_list())
elif state == State.keys_missing:
self.expected[harvester_index].remove_keys_missing(directory.path_list())
elif state == State.duplicates:
self.expected[harvester_index].remove_duplicates(directory.path_list())
else:
assert False, "Invalid state"
def add_all_directories(self, harvester_index: int) -> None:
self.add_directory(harvester_index, self.dir_1)
self.add_directory(harvester_index, self.dir_2)
self.add_directory(harvester_index, self.dir_3)
self.add_directory(harvester_index, self.dir_4)
self.add_directory(harvester_index, self.dir_keys_missing, State.keys_missing)
self.add_directory(harvester_index, self.dir_invalid, State.invalid)
# Note: This does not add dir_duplicates since its important that the duplicated plots are loaded after the
# the original ones.
# self.add_directory(harvester_index, self.dir_duplicates, State.duplicates)
def remove_all_directories(self, harvester_index: int) -> None:
self.remove_directory(harvester_index, self.dir_1)
self.remove_directory(harvester_index, self.dir_2)
self.remove_directory(harvester_index, self.dir_3)
self.remove_directory(harvester_index, self.dir_4)
self.remove_directory(harvester_index, self.dir_keys_missing, State.keys_missing)
self.remove_directory(harvester_index, self.dir_invalid, State.invalid)
self.remove_directory(harvester_index, self.dir_duplicates, State.duplicates)
async def plot_sync_callback(self, peer_id: bytes32, delta: Optional[Delta]) -> None:
if delta is None:
return
harvester: Optional[Harvester] = self.get_harvester(peer_id)
assert harvester is not None
expected = self.expected[self.harvesters.index(harvester)]
assert len(expected.valid_delta.additions) == len(delta.valid.additions)
for path, plot_info in expected.valid_delta.additions.items():
assert path in delta.valid.additions
plot = harvester.plot_manager.plots.get(Path(path), None)
assert plot is not None
assert plot.prover.get_filename() == delta.valid.additions[path].filename
assert plot.prover.get_size() == delta.valid.additions[path].size
assert plot.prover.get_id() == delta.valid.additions[path].plot_id
assert plot.prover.get_compression_level() == delta.valid.additions[path].compression_level
assert plot.pool_public_key == delta.valid.additions[path].pool_public_key
assert plot.pool_contract_puzzle_hash == delta.valid.additions[path].pool_contract_puzzle_hash
assert plot.plot_public_key == delta.valid.additions[path].plot_public_key
assert plot.file_size == delta.valid.additions[path].file_size
assert int(plot.time_modified) == delta.valid.additions[path].time_modified
assert_path_list_matches(expected.valid_delta.removals, delta.valid.removals)
assert_path_list_matches(expected.invalid_delta.additions, delta.invalid.additions)
assert_path_list_matches(expected.invalid_delta.removals, delta.invalid.removals)
assert_path_list_matches(expected.keys_missing_delta.additions, delta.keys_missing.additions)
assert_path_list_matches(expected.keys_missing_delta.removals, delta.keys_missing.removals)
assert_path_list_matches(expected.duplicates_delta.additions, delta.duplicates.additions)
assert_path_list_matches(expected.duplicates_delta.removals, delta.duplicates.removals)
expected.valid_delta.clear()
expected.invalid_delta.clear()
expected.keys_missing_delta.clear()
expected.duplicates_delta.clear()
expected.callback_passed = True
async def run_sync_test(self) -> None:
plot_manager: PlotManager
assert len(self.harvesters) == len(self.expected)
last_sync_ids: List[uint64] = []
# Run the test in two steps, first trigger the refresh on both harvesters
for harvester in self.harvesters:
plot_manager = harvester.plot_manager
assert harvester.server is not None
receiver = self.farmer.plot_sync_receivers[harvester.server.node_id]
# Make sure to reset the passed flag always before a new run
self.expected[self.harvesters.index(harvester)].callback_passed = False
receiver._update_callback = self.plot_sync_callback
assert harvester.plot_sync_sender._last_sync_id == receiver._last_sync.sync_id
last_sync_ids.append(harvester.plot_sync_sender._last_sync_id)
plot_manager.start_refreshing()
plot_manager.trigger_refresh()
# Then wait for them to be synced with the farmer and validate them
for harvester in self.harvesters:
plot_manager = harvester.plot_manager
assert harvester.server is not None
receiver = self.farmer.plot_sync_receivers[harvester.server.node_id]
await time_out_assert(20, plot_manager.needs_refresh, value=False)
harvester_index = self.harvesters.index(harvester)
await time_out_assert(
10, synced, True, harvester.plot_sync_sender, receiver, last_sync_ids[harvester_index]
)
expected = self.expected[harvester_index]
assert plot_manager.plot_count() == len(receiver.plots()) == expected.valid_count
assert len(plot_manager.failed_to_open_filenames) == len(receiver.invalid()) == expected.invalid_count
assert len(plot_manager.no_key_filenames) == len(receiver.keys_missing()) == expected.keys_missing_count
assert len(plot_manager.get_duplicates()) == len(receiver.duplicates()) == expected.duplicates_count
assert expected.callback_passed
assert expected.valid_delta.empty()
assert expected.invalid_delta.empty()
assert expected.keys_missing_delta.empty()
assert expected.duplicates_delta.empty()
for path, plot_info in plot_manager.plots.items():
assert str(path) in receiver.plots()
assert plot_info.prover.get_filename() == receiver.plots()[str(path)].filename
assert plot_info.prover.get_size() == receiver.plots()[str(path)].size
assert plot_info.prover.get_id() == receiver.plots()[str(path)].plot_id
assert plot_info.prover.get_compression_level() == receiver.plots()[str(path)].compression_level
assert plot_info.pool_public_key == receiver.plots()[str(path)].pool_public_key
assert plot_info.pool_contract_puzzle_hash == receiver.plots()[str(path)].pool_contract_puzzle_hash
assert plot_info.plot_public_key == receiver.plots()[str(path)].plot_public_key
assert plot_info.file_size == receiver.plots()[str(path)].file_size
assert int(plot_info.time_modified) == receiver.plots()[str(path)].time_modified
for path in plot_manager.failed_to_open_filenames:
assert str(path) in receiver.invalid()
for path in plot_manager.no_key_filenames:
assert str(path) in receiver.keys_missing()
for path in plot_manager.get_duplicates():
assert str(path) in receiver.duplicates()
async def handshake_done(self, index: int) -> bool:
return (
self.harvesters[index].plot_manager._refresh_thread is not None
and len(self.harvesters[index].plot_manager.farmer_public_keys) > 0
)
@pytest.fixture(scope="function")
async def environment(
tmp_path: Path,
farmer_two_harvester_not_started: Tuple[List[HarvesterService], FarmerService, BlockTools],
) -> AsyncIterator[Environment]:
def new_test_dir(name: str, plot_list: List[Path]) -> Directory:
return Directory(tmp_path / "plots" / name, plot_list)
plots: List[Path] = get_test_plots()
plots_invalid: List[Path] = get_test_plots()[0:3]
plots_keys_missing: List[Path] = get_test_plots("not_in_keychain")
# Create 4 directories where: dir_n contains n plots
directories: List[Directory] = []
offset: int = 0
while len(directories) < 4:
dir_number = len(directories) + 1
directories.append(new_test_dir(f"{dir_number}", plots[offset : offset + dir_number]))
offset += dir_number
dir_invalid: Directory = new_test_dir("invalid", plots_invalid)
dir_keys_missing: Directory = new_test_dir("keys_missing", plots_keys_missing)
dir_duplicates: Directory = new_test_dir("duplicates", directories[3].plots)
create_default_chia_config(tmp_path)
# Invalidate the plots in `dir_invalid`
for path in dir_invalid.path_list():
with open(path, "wb") as file:
file.write(bytes(100))
harvester_services, farmer_service, bt = farmer_two_harvester_not_started
farmer_service.reconnect_retry_seconds = 1
farmer: Farmer = farmer_service._node
async with split_async_manager(manager=farmer_service.manage(), object=farmer_service) as split_farmer_manager:
await split_farmer_manager.enter()
async with contextlib.AsyncExitStack() as async_exit_stack:
split_harvester_managers = [
await async_exit_stack.enter_async_context(start_harvester_service(service, farmer_service))
for service in harvester_services
]
harvesters = [manager.object for manager in split_harvester_managers]
for harvester in harvesters:
# Remove default plot directory for this tests
with lock_and_load_config(harvester.root_path, "config.yaml") as config:
config["harvester"]["plot_directories"] = []
save_config(harvester.root_path, "config.yaml", config)
harvester.plot_manager.set_public_keys(
bt.plot_manager.farmer_public_keys.copy(), bt.plot_manager.pool_public_keys.copy()
)
assert len(farmer.plot_sync_receivers) == 2
yield Environment(
tmp_path,
harvester_services,
farmer_service,
harvesters,
farmer,
directories[0],
directories[1],
directories[2],
directories[3],
dir_invalid,
dir_keys_missing,
dir_duplicates,
[ExpectedResult() for _ in harvesters],
split_farmer_service_manager=split_farmer_manager,
split_harvester_managers=split_harvester_managers,
)
@pytest.mark.anyio
async def test_sync_valid(environment: Environment) -> None:
env: Environment = environment
env.add_directory(0, env.dir_1)
env.add_directory(1, env.dir_2)
await env.run_sync_test()
# Run again two times to make sure we still get the same results in repeated refresh intervals
env.expected[0].valid_delta.clear()
env.expected[1].valid_delta.clear()
await env.run_sync_test()
await env.run_sync_test()
env.add_directory(0, env.dir_3)
env.add_directory(1, env.dir_4)
await env.run_sync_test()
while len(env.dir_3.path_list()):
drop_plot = env.dir_3.path_list()[0]
drop_plot.unlink()
env.dir_3.drop(drop_plot)
env.expected[0].remove_valid([drop_plot])
await env.run_sync_test()
env.remove_directory(0, env.dir_3)
await env.run_sync_test()
env.remove_directory(1, env.dir_4)
await env.run_sync_test()
env.remove_directory(0, env.dir_1)
env.remove_directory(1, env.dir_2)
await env.run_sync_test()
@pytest.mark.anyio
async def test_sync_invalid(environment: Environment) -> None:
env: Environment = environment
assert len(env.farmer.plot_sync_receivers) == 2
# Use dir_3 and dir_4 in this test because the invalid plots are copies from dir_1 + dir_2
env.add_directory(0, env.dir_3)
env.add_directory(0, env.dir_invalid, State.invalid)
env.add_directory(1, env.dir_4)
await env.run_sync_test()
# Run again two times to make sure we still get the same results in repeated refresh intervals
await env.run_sync_test()
await env.run_sync_test()
# Drop all but two of the invalid plots
assert len(env.dir_invalid) > 2
for _ in range(len(env.dir_invalid) - 2):
drop_plot = env.dir_invalid.path_list()[0]
drop_plot.unlink()
env.dir_invalid.drop(drop_plot)
env.expected[0].remove_invalid([drop_plot])
await env.run_sync_test()
assert len(env.dir_invalid) == 2
# Add the directory to the first harvester too
env.add_directory(1, env.dir_invalid, State.invalid)
await env.run_sync_test()
# Recover one the remaining invalid plot
for path in get_test_plots():
if path.name == env.dir_invalid.path_list()[0].name:
copy(path, env.dir_invalid.path)
for i in range(len(env.harvesters)):
env.expected[i].add_valid([env.dir_invalid.plot_info_list()[0]])
env.expected[i].remove_invalid([env.dir_invalid.path_list()[0]])
env.harvesters[i].plot_manager.refresh_parameter = replace(
env.harvesters[i].plot_manager.refresh_parameter, retry_invalid_seconds=uint32(0)
)
await env.run_sync_test()
for i in [0, 1]:
remove_plot_directory(env.harvesters[i].root_path, str(env.dir_invalid.path))
env.expected[i].remove_valid([env.dir_invalid.path_list()[0]])
env.expected[i].remove_invalid([env.dir_invalid.path_list()[1]])
await env.run_sync_test()
@pytest.mark.anyio
async def test_sync_keys_missing(environment: Environment) -> None:
env: Environment = environment
env.add_directory(0, env.dir_1)
env.add_directory(0, env.dir_keys_missing, State.keys_missing)
env.add_directory(1, env.dir_2)
await env.run_sync_test()
# Run again two times to make sure we still get the same results in repeated refresh intervals
await env.run_sync_test()
await env.run_sync_test()
# Drop all but 2 plots with missing keys and test sync inbetween
assert len(env.dir_keys_missing) > 2
for _ in range(len(env.dir_keys_missing) - 2):
drop_plot = env.dir_keys_missing.path_list()[0]
drop_plot.unlink()
env.dir_keys_missing.drop(drop_plot)
env.expected[0].remove_keys_missing([drop_plot])
await env.run_sync_test()
assert len(env.dir_keys_missing) == 2
# Add the plots with missing keys to the other harvester
env.add_directory(0, env.dir_3)
env.add_directory(1, env.dir_keys_missing, State.keys_missing)
await env.run_sync_test()
# Add the missing keys to the first harvester's plot manager
env.harvesters[0].plot_manager.farmer_public_keys.append(G1Element())
env.harvesters[0].plot_manager.pool_public_keys.append(G1Element())
# And validate they become valid now
env.expected[0].add_valid(env.dir_keys_missing.plot_info_list())
env.expected[0].remove_keys_missing(env.dir_keys_missing.path_list())
await env.run_sync_test()
# Drop the valid plots from one harvester and the keys missing plots from the other harvester
env.remove_directory(0, env.dir_keys_missing)
env.remove_directory(1, env.dir_keys_missing, State.keys_missing)
await env.run_sync_test()
@pytest.mark.anyio
async def test_sync_duplicates(environment: Environment) -> None:
env: Environment = environment
# dir_4 and then dir_duplicates contain the same plots. Load dir_4 first to make sure the plots seen as duplicates
# are from dir_duplicates.
env.add_directory(0, env.dir_4)
await env.run_sync_test()
env.add_directory(0, env.dir_duplicates, State.duplicates)
env.add_directory(1, env.dir_2)
await env.run_sync_test()
# Run again two times to make sure we still get the same results in repeated refresh intervals
await env.run_sync_test()
await env.run_sync_test()
# Drop all but 1 duplicates and test sync in-between
assert len(env.dir_duplicates) > 2
for _ in range(len(env.dir_duplicates) - 2):
drop_plot = env.dir_duplicates.path_list()[0]
drop_plot.unlink()
env.dir_duplicates.drop(drop_plot)
env.expected[0].remove_duplicates([drop_plot])
await env.run_sync_test()
assert len(env.dir_duplicates) == 2
# Removing dir_4 now leads to the plots in dir_duplicates to become loaded instead
env.remove_directory(0, env.dir_4)
env.expected[0].remove_duplicates(env.dir_duplicates.path_list())
env.expected[0].add_valid(env.dir_duplicates.plot_info_list())
await env.run_sync_test()
async def add_and_validate_all_directories(env: Environment) -> None:
# Add all available directories to both harvesters and make sure they load and get synced
env.add_all_directories(0)
env.add_all_directories(1)
await env.run_sync_test()
env.add_directory(0, env.dir_duplicates, State.duplicates)
env.add_directory(1, env.dir_duplicates, State.duplicates)
await env.run_sync_test()
async def remove_and_validate_all_directories(env: Environment) -> None:
# Remove all available directories to both harvesters and make sure they are removed and get synced
env.remove_all_directories(0)
env.remove_all_directories(1)
await env.run_sync_test()
@pytest.mark.anyio
async def test_add_and_remove_all_directories(environment: Environment) -> None:
await add_and_validate_all_directories(environment)
await remove_and_validate_all_directories(environment)
@pytest.mark.anyio
async def test_harvester_restart(environment: Environment) -> None:
env: Environment = environment
# Load all directories for both harvesters
await add_and_validate_all_directories(env)
# Stop the harvester and make sure the receiver gets dropped on the farmer and refreshing gets stopped
await env.split_harvester_managers[0].exit()
assert len(env.farmer.plot_sync_receivers) == 1
assert not env.harvesters[0].plot_manager._refreshing_enabled
assert not env.harvesters[0].plot_manager.needs_refresh()
# Start the harvester, wait for the handshake and make sure the receiver comes back
async with start_harvester_service(env.harvester_services[0], env.farmer_service):
await time_out_assert(5, env.handshake_done, True, 0)
assert len(env.farmer.plot_sync_receivers) == 2
# Remove the duplicates dir to avoid conflicts with the original plots
env.remove_directory(0, env.dir_duplicates)
# Reset the expected data for harvester 0 and re-add all directories because of the restart
env.expected[0] = ExpectedResult()
env.add_all_directories(0)
# Run the refresh two times and make sure everything recovers and stays recovered after harvester restart
await env.run_sync_test()
env.add_directory(0, env.dir_duplicates, State.duplicates)
await env.run_sync_test()
@pytest.mark.anyio
async def test_farmer_restart(environment: Environment) -> None:
env: Environment = environment
# Load all directories for both harvesters
await add_and_validate_all_directories(env)
last_sync_ids: List[uint64] = []
for i in range(0, len(env.harvesters)):
last_sync_ids.append(env.harvesters[i].plot_sync_sender._last_sync_id)
# Stop the farmer and make sure both receivers get dropped and refreshing gets stopped on the harvesters
await env.split_farmer_service_manager.exit()
assert len(env.farmer.plot_sync_receivers) == 0
assert not env.harvesters[0].plot_manager._refreshing_enabled
assert not env.harvesters[1].plot_manager._refreshing_enabled
# Start the farmer, wait for the handshake and make sure the receivers come back
async with env.farmer_service.manage():
await time_out_assert(5, env.handshake_done, True, 0)
await time_out_assert(5, env.handshake_done, True, 1)
assert len(env.farmer.plot_sync_receivers) == 2
# Do not use run_sync_test here, to have a more realistic test scenario just
# wait for the harvesters to be synced. The handshake should trigger re-sync.
for i in range(0, len(env.harvesters)):
harvester: Harvester = env.harvesters[i]
assert harvester.server is not None
receiver = env.farmer.plot_sync_receivers[harvester.server.node_id]
await time_out_assert(20, synced, True, harvester.plot_sync_sender, receiver, last_sync_ids[i])
# Validate the sync
for harvester in env.harvesters:
plot_manager: PlotManager = harvester.plot_manager
assert harvester.server is not None
receiver = env.farmer.plot_sync_receivers[harvester.server.node_id]
expected = env.expected[env.harvesters.index(harvester)]
assert plot_manager.plot_count() == len(receiver.plots()) == expected.valid_count
assert len(plot_manager.failed_to_open_filenames) == len(receiver.invalid()) == expected.invalid_count
assert len(plot_manager.no_key_filenames) == len(receiver.keys_missing()) == expected.keys_missing_count
assert len(plot_manager.get_duplicates()) == len(receiver.duplicates()) == expected.duplicates_count
@pytest.mark.anyio
async def test_sync_start_and_disconnect_while_sync_is_active(
farmer_one_harvester: Tuple[List[HarvesterService], FarmerService, BlockTools]
) -> None:
harvesters, farmer_service, _ = farmer_one_harvester
harvester_service = harvesters[0]
harvester = harvester_service._node
farmer: Farmer = farmer_service._node
Constants.message_timeout = 3
async def receiver_available() -> bool:
return harvester.server.node_id in farmer.plot_sync_receivers
async def disconnecting_process(
self: Receiver, method: Callable[[_T_Streamable], Any], message_type: ProtocolMessageTypes, message: Any
) -> None:
if self.current_sync().state == State.loaded:
harvester.plot_manager.trigger_refresh()
await asyncio.sleep(2)
await self.connection().close()
return
await original_process(method, message_type, message)
# Wait for the receiver to show up
await time_out_assert(20, receiver_available)
receiver = farmer.plot_sync_receivers[harvester.server.node_id]
# And wait until the first sync from the harvester to the farmer is done
await time_out_assert(20, receiver.initial_sync, False)
# Replace the `Receiver._process` with `disconnecting_process` which triggers a plot manager refresh and disconnects
# the farmer from the harvester during an active sync.
original_process = receiver._process
receiver._process = functools.partial(disconnecting_process, receiver) # type: ignore[method-assign]
# Trigger the refresh which leads to a new sync_start being triggered during the active sync.
harvester.plot_manager.trigger_refresh()
await time_out_assert(20, harvester.plot_sync_sender.sync_active)
# Now wait until the receiver disappears from the farmer's plot_sync_receivers which means its disconnected.
await time_out_assert(20, receiver_available, False)
# Wait until the sync was aborted
await time_out_assert(20, harvester.plot_sync_sender.sync_active, False)
# And then wait for the harvester to reconnect and the receiver to re-appear.
await time_out_assert(20, receiver_available, True)
# Make sure the receiver object has been changed because of the disconnect
assert farmer.plot_sync_receivers[harvester.server.node_id] is not receiver
receiver = farmer.plot_sync_receivers[harvester.server.node_id]
current_last_sync_id = receiver.last_sync().sync_id
# Now start another sync and wait for it to be done to make sure everything still works fine
harvester.plot_manager.trigger_refresh()
await time_out_assert(20, synced, True, harvester.plot_sync_sender, receiver, current_last_sync_id)