-
Notifications
You must be signed in to change notification settings - Fork 212
/
Copy pathtestdurable.py
128 lines (96 loc) · 3.82 KB
/
testdurable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from durable.lang import *
import redis
import time
import datetime
import traceback
import sys
def unix_time(dt):
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return int(delta.total_seconds() * 1000.0)
def provide_durability(host, redis_host_name = 'localhost', port = 6379):
r = redis.StrictRedis(redis_host_name, port, charset="utf-8", decode_responses=True)
def get_hset_name(ruleset, sid):
return 'h!{0}!{1}'.format(ruleset, sid)
def get_list_name(ruleset, sid):
return 'l!{0}!{1}'.format(ruleset, sid)
def get_sset_name(ruleset):
return 's!{0}'.format(ruleset)
def format_message(action_type, content):
return '{0},{1}'.format(action_type, content)
def format_messages(results):
if not results:
return '[]'
messages = ['[']
for i in range(0, len(results)):
messages.append(results[i])
if i < (len(results) - 1):
messages.append(',')
messages.append(']')
return ''.join(messages)
def store_message_callback(ruleset, sid, mid, action_type, content):
try:
r.hset(get_hset_name(ruleset, sid), mid, format_message(action_type, content))
except BaseException as e:
print(e)
return 601
return 0
def delete_message_callback(ruleset, sid, mid):
try:
r.hdel(get_hset_name(ruleset, sid), mid)
except BaseException as e:
print(e)
return 602
return 0
def queue_message_callback(ruleset, sid, action_type, content):
try:
result = r.zscore(get_sset_name(ruleset), sid)
if not result:
r.zadd(get_sset_name(ruleset), {sid: unix_time(datetime.datetime.now())})
r.rpush(get_list_name(ruleset, sid), format_message(action_type, content))
except BaseException as e:
print(e)
return 603
return 0
def get_queued_messages_callback(ruleset, sid):
try:
r.zadd(get_sset_name(ruleset), {sid: unix_time(datetime.datetime.now()) + 5000})
messages = r.lrange(get_list_name(ruleset, sid), 0, -1)
if len(messages):
r.delete(get_list_name(ruleset, sid))
host.complete_get_queued_messages(ruleset, sid, format_messages(messages))
except BaseException as e:
print(e)
return 604
return 0
def get_idle_state_callback(ruleset):
try:
results = r.zrangebyscore(get_sset_name(ruleset), 0, unix_time(datetime.datetime.now()))
if (results and len(results)):
sid = results[0]
messages = r.hvals(get_hset_name(ruleset, sid))
host.complete_get_idle_state(ruleset, sid, format_messages(messages))
except BaseException as e:
print(e)
return 606
return 0
host.set_store_message_callback(store_message_callback)
host.set_delete_message_callback(delete_message_callback)
host.set_queue_message_callback(queue_message_callback)
host.set_get_idle_state_callback(get_idle_state_callback)
host.set_get_queued_messages_callback(get_queued_messages_callback)
provide_durability(get_host())
with ruleset('test'):
@when_all(c.first << m.subject == 'Hello',
c.second << m.subject == 'World')
def say_hello(c):
if not c.s.count:
c.s.count = 1
else:
c.s.count += 1
print('{0} {1} from: {2} count: {3}'.format(c.first.subject, c.second.subject, c.s.sid, c.s.count))
post('test', { 'subject': 'Hello', 'sid': 'a' })
post('test', { 'subject': 'World', 'sid': 'b' })
post('test', { 'subject': 'Hello', 'sid': 'b' })
post('test', { 'subject': 'World', 'sid': 'a' })
time.sleep(30)