forked from Chia-Network/chia-blockchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
time_out_assert.py
52 lines (41 loc) · 1.67 KB
/
time_out_assert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import logging
import time
from typing import Callable
from chia.protocols.protocol_message_types import ProtocolMessageTypes
log = logging.getLogger(__name__)
async def time_out_assert_custom_interval(timeout: int, interval, function, value=True, *args, **kwargs):
start = time.time()
while time.time() - start < timeout:
if asyncio.iscoroutinefunction(function):
f_res = await function(*args, **kwargs)
else:
f_res = function(*args, **kwargs)
if value == f_res:
return None
await asyncio.sleep(interval)
assert False
async def time_out_assert(timeout: int, function, value=True, *args, **kwargs):
await time_out_assert_custom_interval(timeout, 0.05, function, value, *args, *kwargs)
async def time_out_assert_not_none(timeout: int, function, *args, **kwargs):
start = time.time()
while time.time() - start < timeout:
if asyncio.iscoroutinefunction(function):
f_res = await function(*args, **kwargs)
else:
f_res = function(*args, **kwargs)
if f_res is not None:
return None
await asyncio.sleep(0.05)
assert False
def time_out_messages(incoming_queue: asyncio.Queue, msg_name: str, count: int = 1) -> Callable:
async def bool_f():
if incoming_queue.qsize() < count:
return False
for _ in range(count):
response = (await incoming_queue.get())[0].type
if ProtocolMessageTypes(response).name != msg_name:
# log.warning(f"time_out_message: found {response} instead of {msg_name}")
return False
return True
return bool_f