-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinterThreadChannels.py
125 lines (101 loc) · 4.06 KB
/
interThreadChannels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#
# A demonstration of how channels are allow interthread communication.
#
# Author: Richard Tew <[email protected]>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# FURTHER DETAIL:
#
# Sending or receiving on a channel when there is a waiting tasklet
# from another thread, will always block and schedule the tasklet
# on the other thread. The channel 'preference' attribute has no
# effect on this behaviour. This may not be the most optimal way
# to do sleeping and it might be better to have per-thread
# management of sleepers.
#
import threading
import stackless
import time
# Flag which should allow the threads to exit if cleared.
running = True
# Altered boilerplate Sleep function.
sleepingTasklets = []
sleepCountByThread = {}
threadIDByChannelID = {}
lock = threading.Lock()
def GetThreadID():
return id(threading.currentThread())
def Sleep(secondsToWait):
global sleepingTasklets, lock, threadIDByChannelID, sleepCountByThread
channel = stackless.channel()
endTime = time.time() + secondsToWait
threadID = GetThreadID()
lock.acquire(True)
sleepCountByThread[threadID] = sleepCountByThread.get(threadID, 0) + 1
threadIDByChannelID[id(channel)] = threadID
sleepingTasklets.append((endTime, channel))
sleepingTasklets.sort()
lock.release()
# Block until we get sent an awakening notification.
channel.receive()
def ManageSleepingTasklets(threadID):
global sleepingTasklets, lock, running, threadIDByChannelID, sleepCountByThread
sleepingTasklets = []
while running:
if len(sleepingTasklets):
lock.acquire(True)
endTime = sleepingTasklets[0][0]
if endTime <= time.time():
channel = sleepingTasklets[0][1]
del sleepingTasklets[0]
threadID = threadIDByChannelID[id(channel)]
sleepCountByThread[threadID] -= 1
lock.release()
# We have to send something, but it doesn't matter what as it is not used.
channel.send(None)
else:
lock.release()
elif stackless.getruncount() == 1:
# Give up if there are no more sleeping tasklets. Otherwise the two
# threads keep on running endlessly.
print "Sleeping tasklet exited due to no remaining work."
break
stackless.schedule()
else:
print threadID, "Sleeping tasklet exited due to change in 'running' flag"
# ...
def looping_tasklet(threadID, taskletID):
n = 3
while n > 0:
n -= 1
print threadID, "looping_tasklet", taskletID, "loop", n
Sleep(1.0)
print threadID, "looping_tasklet", taskletID, "exit"
def a_main_tasklet():
global running
threadID = GetThreadID()
for i in range(3):
stackless.tasklet(looping_tasklet)(threadID, i+1)
# We need to catch the keyboard interrupt and signal the other thread to exit.
try:
print threadID, "start: runcount", stackless.getruncount()
# Do a preliminary run to get some tasklets in the scheduler or some
# sleeping tasklets in place, otherwise we would not run at all.
stackless.run()
# Now we should be set to run until we are done.
while running and (stackless.getruncount() > 1 or sleepCountByThread.get(threadID, 0)):
print threadID, "start: runcount", stackless.getruncount(), "sleepcount", sleepCountByThread.get(threadID, 0)
stackless.run()
print threadID, "stop: runcount", stackless.getruncount(), "sleepcount", sleepCountByThread.get(threadID, 0)
except:
running = False
raise
if __name__ == "__main__":
threadID = GetThreadID()
stackless.tasklet(ManageSleepingTasklets)(threadID)
thread = threading.Thread(target=a_main_tasklet)
thread.start()
a_main_tasklet()