forked from HelloZeroNet/ZeroNet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNoparallel.py
139 lines (117 loc) · 4.39 KB
/
Noparallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import gevent
import time
class Noparallel(object): # Only allow function running once in same time
def __init__(self, blocking=True):
self.threads = {}
self.blocking = blocking # Blocking: Acts like normal function else thread returned
def __call__(self, func):
def wrapper(*args, **kwargs):
key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters
if key in self.threads: # Thread already running (if using blocking mode)
thread = self.threads[key]
if self.blocking:
thread.join() # Blocking until its finished
return thread.value # Return the value
else: # No blocking
if thread.ready(): # Its finished, create a new
thread = gevent.spawn(func, *args, **kwargs)
self.threads[key] = thread
return thread
else: # Still running
return thread
else: # Thread not running
thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
thread.link(lambda thread: self.cleanup(key, thread))
self.threads[key] = thread
if self.blocking: # Wait for finish
thread.join()
ret = thread.value
return ret
else: # No blocking just return the thread
return thread
wrapper.func_name = func.func_name
return wrapper
# Cleanup finished threads
def cleanup(self, key, thread):
if key in self.threads:
del(self.threads[key])
if __name__ == "__main__":
class Test():
@Noparallel()
def count(self, num=5):
for i in range(num):
print self, i
time.sleep(1)
return "%s return:%s" % (self, i)
class TestNoblock():
@Noparallel(blocking=False)
def count(self, num=5):
for i in range(num):
print self, i
time.sleep(1)
return "%s return:%s" % (self, i)
def testBlocking():
test = Test()
test2 = Test()
print "Counting..."
print "Creating class1/thread1"
thread1 = gevent.spawn(test.count)
print "Creating class1/thread2 (ignored)"
thread2 = gevent.spawn(test.count)
print "Creating class2/thread3"
thread3 = gevent.spawn(test2.count)
print "Joining class1/thread1"
thread1.join()
print "Joining class1/thread2"
thread2.join()
print "Joining class2/thread3"
thread3.join()
print "Creating class1/thread4 (its finished, allowed again)"
thread4 = gevent.spawn(test.count)
print "Joining thread4"
thread4.join()
print thread1.value, thread2.value, thread3.value, thread4.value
print "Done."
def testNoblocking():
test = TestNoblock()
test2 = TestNoblock()
print "Creating class1/thread1"
thread1 = test.count()
print "Creating class1/thread2 (ignored)"
thread2 = test.count()
print "Creating class2/thread3"
thread3 = test2.count()
print "Joining class1/thread1"
thread1.join()
print "Joining class1/thread2"
thread2.join()
print "Joining class2/thread3"
thread3.join()
print "Creating class1/thread4 (its finished, allowed again)"
thread4 = test.count()
print "Joining thread4"
thread4.join()
print thread1.value, thread2.value, thread3.value, thread4.value
print "Done."
def testBenchmark():
import time
def printThreadNum():
import gc
from greenlet import greenlet
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
print "Greenlets: %s" % len(objs)
printThreadNum()
test = TestNoblock()
s = time.time()
for i in range(3):
gevent.spawn(test.count, i + 1)
print "Created in %.3fs" % (time.time() - s)
printThreadNum()
time.sleep(5)
from gevent import monkey
monkey.patch_all()
testBenchmark()
print "Testing blocking mode..."
testBlocking()
print "Testing noblocking mode..."
testNoblocking()