-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtest_rx_perf.py
165 lines (133 loc) · 4.42 KB
/
test_rx_perf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# we use gevent
from gevent import monkey
import time, sys
monkey.patch_all()
import gevent, pycond as pc
from rx.scheduler.eventloop import GEventScheduler
import rx.scheduler.eventloop as e
from threading import Event, current_thread as ct
# _thn = lambda msg, data: print('thread:', cur_thread().name, msg, data)
tn = lambda: ct().name
GS = GEventScheduler(gevent)
# GS = None
Rx, rx = pc.import_rx()
# set this higher and watch mem getting constant:
now, count, prnt = time.time, 10000, 0
class F:
def odd(k, v, cfg, data, **kw):
data['odd'] = tn() # add the thread name
return 1, v
def blocking(k, v, cfg, data, **kw):
data['blocking'] = tn()
if data['i'] == 0:
time.sleep(0.001)
if prnt:
print(data)
return 3, v
perf = {}
def clear(p):
p.clear
p['lasti'] = 0
p['lastt'] = now()
class Tests:
cond = [
['i', 'lt', 100000000], # just make it a bit complex
'and',
[[':odd', 'eq', 1], 'and_not', ['i', 'eq', 2]],
'and_not',
[':blocking', 'eq', 3],
]
conds = [['mycond', cond,]]
def test_perf_compare(self):
res = {}
print()
def stats(m):
i = m['i']
if i - perf['lasti'] > 1000:
p = perf
p['lasti'] = i
print(i, now() - p['lastt'])
p['lastt'] = now()
return m
def d(i):
return {'i': i}
def _measure(f):
clear(perf)
fn = f.__name__
t0 = now()
l = f()
dt = now() - t0
assert len(l) > count - 2
assert 'odd' in l[-1] and 'blocking' in l[-1]
# item 2 was failing the condition to even check the blocking
zt = [f for f in l if f['i'] in (0, 2)]
assert zt[0].get('blocking') != zt[1].get('blocking')
print(fn, dt)
res[fn] = dt
def direct():
pcd = pc.parse_cond(Tests.cond, lookup_provider=F)[0]
l = [d(i) for i in range(count)]
[pcd(state=m) for m in l]
return l
def qual():
pcn = pc.qualify(Tests.conds, lookup_provider=F)
l = [d(i) for i in range(count)]
[pcn(m) for m in l]
return l
def _rxrun(**kw):
ev = Event()
unblock = lambda *a: ev.set()
rxop = lambda **kw: pc.rxop(
Tests.conds,
into='mod',
scheduler=GS,
lookup_provider=F,
timeout=0.01,
**kw
)
l = []
add = lambda m: l.append(m)
# Rx.interval(0, scheduler=GS).pipe(
rxcond = rxop(**kw)
s = Rx.from_(range(count)).pipe(
rx.map(d), rxcond, rx.take(count), rx.map(stats)
)
s.subscribe(add, on_completed=unblock)
ev.wait()
if not kw:
# same thread
assert l[-1]['odd'] == l[-1]['blocking']
assert l[0]['i'] == 0 # 0 was sleeping a bit but we are sync
else:
# ran not on same thread
time.sleep(0.1)
try:
assert l[-1]['odd'] != l[-1]['blocking']
except Exception as ex:
print('breakpoint set')
breakpoint()
keep_ctx = True
assert l[0]['i'] != 0 # 0 was sleeping a bit
return l
def rxsync():
return _rxrun()
def rxasync():
res = _rxrun(asyn=['blocking'])
return res
_measure(direct)
_measure(qual)
_measure(rxsync)
_measure(rxasync)
# to see that mem goes down after the greenlets are done:
# while True:
# time.sleep(5)
# breakpoint() # FIXME BREAKPOINT
# return
head = '%s Items' % count
print('\n'.join(('', head, '=' * len(head))))
[print('%9s' % k, v) for k, v in res.items()]
# yes, we are 10 times slower when all items are processed async:
# doing this with rx.group_by(needs_asnc) -> flat_map(s.pipe(map(rx.just(x, GS)))) was far far slower yet, so I'm sort of ok with our 10k / sec:
assert res['rxasync'] < 15 * res['direct']
if __name__ == '__main__':
Tests().test_perf_compare()