forked from HelloZeroNet/ZeroNet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRateLimit.py
129 lines (105 loc) · 4.27 KB
/
RateLimit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import time
import gevent
import logging
log = logging.getLogger("RateLimit")
called_db = {} # Holds events last call time
queue_db = {} # Commands queued to run
# Register event as called
# Return: None
def called(event, penalty=0):
called_db[event] = time.time() + penalty
# Check if calling event is allowed
# Return: True if allowed False if not
def isAllowed(event, allowed_again=10):
last_called = called_db.get(event)
if not last_called: # Its not called before
return True
elif time.time() - last_called >= allowed_again:
del called_db[event] # Delete last call time to save memory
return True
else:
return False
def delayLeft(event, allowed_again=10):
last_called = called_db.get(event)
if not last_called: # Its not called before
return 0
else:
return allowed_again - (time.time() - last_called)
def callQueue(event):
func, args, kwargs, thread = queue_db[event]
log.debug("Calling: %s" % event)
del called_db[event]
del queue_db[event]
return func(*args, **kwargs)
# Rate limit and delay function call if necessary
# If the function called again within the rate limit interval then previous queued call will be dropped
# Return: Immediately gevent thread
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event, allowed_again): # Not called recently, call it now
called(event)
# print "Calling now"
return gevent.spawn(func, *args, **kwargs)
else: # Called recently, schedule it for later
time_left = allowed_again - max(0, time.time() - called_db[event])
log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
if not queue_db.get(event): # Function call not queued yet
thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
queue_db[event] = (func, args, kwargs, thread)
return thread
else: # Function call already queued, just update the parameters
thread = queue_db[event][3]
queue_db[event] = (func, args, kwargs, thread)
return thread
# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
def call(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event): # Not called recently, call it now
called(event)
# print "Calling now", allowed_again
return func(*args, **kwargs)
else: # Called recently, schedule it for later
time_left = max(0, allowed_again - (time.time() - called_db[event]))
# print "Time left: %s" % time_left, args, kwargs
log.debug("Calling sync (%.2fs left): %s" % (time_left, event))
called(event, time_left)
time.sleep(time_left)
back = func(*args, **kwargs)
if event in called_db:
del called_db[event]
return back
# Cleanup expired events every 3 minutes
def rateLimitCleanup():
while 1:
expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes
for event in called_db.keys():
if called_db[event] < expired:
del called_db[event]
time.sleep(60 * 3) # Every 3 minutes
gevent.spawn(rateLimitCleanup)
if __name__ == "__main__":
from gevent import monkey
monkey.patch_all()
import random
def publish(inner_path):
print "Publishing %s..." % inner_path
return 1
def cb(thread):
print "Value:", thread.value
print "Testing async spam requests rate limit to 1/sec..."
for i in range(3000):
thread = callAsync("publish content.json", 1, publish, "content.json %s" % i)
time.sleep(float(random.randint(1, 20)) / 100000)
print thread.link(cb)
print "Done"
time.sleep(2)
print "Testing sync spam requests rate limit to 1/sec..."
for i in range(5):
call("publish data.json", 1, publish, "data.json %s" % i)
time.sleep(float(random.randint(1, 100)) / 100)
print "Done"
print "Testing cleanup"
thread = callAsync("publish content.json single", 1, publish, "content.json single")
print "Needs to cleanup:", called_db, queue_db
print "Waiting 3min for cleanup process..."
time.sleep(60 * 3)
print "Cleaned up:", called_db, queue_db