forked from adafruit/circuitpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasyncio_event_fair.py
49 lines (36 loc) · 892 Bytes
/
asyncio_event_fair.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Test fairness of Event.set()
# That tasks which continuously wait on events don't take over the scheduler
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
# CIRCUITPY-CHANGE: CircuitPython provides __await__()
async def foo():
return 42
try:
foo().__await__
except AttributeError:
print("SKIP")
raise SystemExit
async def task1(id):
for i in range(4):
print("sleep", id)
await asyncio.sleep(0)
async def task2(id, ev):
for i in range(4):
ev.set()
ev.clear()
print("wait", id)
await ev.wait()
async def main():
ev = asyncio.Event()
tasks = [
asyncio.create_task(task1(0)),
asyncio.create_task(task2(2, ev)),
asyncio.create_task(task1(1)),
asyncio.create_task(task2(3, ev)),
]
await tasks[1]
ev.set()
asyncio.run(main())