-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_threading.py
126 lines (97 loc) · 2.79 KB
/
test_threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import random
import asyncio
import threading
from aioutils import Group, Yielder, yielding
@asyncio.coroutine
def f(c):
yield from asyncio.sleep(random.random()*0.02)
return c
def test_group_threading():
""" Ensure that Pool and Group are thread-safe """
stopall = False
def t():
while not stopall:
g = Group()
for i in range(5):
g.spawn(f(i))
g.join()
time.sleep(random.random()*0.02)
tasks = [threading.Thread(target=t) for _ in range(5)]
for task in tasks: task.daemon = True
for task in tasks: task.start()
time.sleep(0.2)
stopall = True
for task in tasks: task.join()
assert asyncio.Task.all_tasks() == set(), asyncio.Task.all_tasks()
def test_yielder_threading():
""" Ensure Yielder are thread safe """
stopall = False
chars = 'abcdefg'
def gen_func():
y = Yielder()
for c in chars:
y.spawn(f(c))
yield from y.yielding()
def t():
while not stopall:
chars2 = list(gen_func())
assert set(chars2) == set(chars)
time.sleep(random.random()*0.02)
tasks = [threading.Thread(target=t) for _ in range(5)]
for task in tasks: task.daemon = True
for task in tasks: task.start()
time.sleep(0.2)
stopall = True
for task in tasks: task.join()
assert asyncio.Task.all_tasks() == set(), asyncio.Task.all_tasks()
def test_mixed():
""" Ensure mixed usage are thread safe """
chars = 'abcdefg'
stopall = False
def f1():
y = Yielder()
for c in chars:
y.spawn(f(c))
return list(y.yielding())
def f2():
g = Group()
for c in chars:
g.spawn(f(c))
g.join()
def t():
while not stopall:
f = random.choice([f1, f2])
r = f()
if f == f1:
assert set(r) == set(chars)
time.sleep(random.random()*0.02)
tasks = [threading.Thread(target=t) for _ in range(5)]
for task in tasks: task.daemon = True
for task in tasks: task.start()
time.sleep(0.2)
stopall = True
for task in tasks: task.join()
assert asyncio.Task.all_tasks() == set(), asyncio.Task.all_tasks()
def test_yielding_size_in_threading():
chars = 'abcdefgh'
def f1():
with yielding(2) as y:
for c in chars:
y.spawn(f(c))
yield from y
l = []
def f2():
for x in f1():
l.append(x)
t = threading.Thread(target=f2)
t.start()
t.join()
assert set(l) == set(chars)
if __name__ == '__main__':
test_group_threading()
test_yielder_threading()
test_mixed()
test_yielding_size_in_threading()