-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlazy.py
169 lines (163 loc) · 5.4 KB
/
lazy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from multiprocessing import Pool, cpu_count
from threading import Lock
from operator import add, sub, mul, truediv, getitem
import time, weakref
def lazy(func):
new_name = '_lazy_'+func.__name__
globals()[new_name] = func
func.__name__ = new_name
func.__module__ = __name__
lazyfunc = Lazy(func)
return lazyfunc
class Lazy(object):
POOL = None
JOBS = weakref.WeakSet()
CACHES = weakref.WeakValueDictionary()
ACTIVE = 0
CORES = cpu_count()
@classmethod
def set_cores(cls, num):
cls.CORES = num
def __init__(self, operator, remote=True, format='{0}({1})', debug=False):
self.operator = operator
self.remote = remote
self.format = format
self.func = True
self.depth = -1
self.args = None
self.waitlock = None
self.debug = debug
def call(self, *args):
cache_key = (self.operator, args)
if cache_key in Lazy.CACHES:
return Lazy.CACHES[cache_key]
Lazy.CACHES[cache_key] = self
self.func = False
self.waitings = []
self.value = None
self.argscount = 0
self.argsready = 0
self.runtime = self.waittime = 0
self.startrun = self.startwait = 0
self.startwait = time.time()
self.args = args
for i in self.args:
if isinstance(i, Lazy):
if i.value is None:
self.argscount += 1
i.waitings.append(self)
if i.debug:
self.debug = True
if self.argscount == 0:
if self.remote:
Lazy.JOBS.add(self)
else:
self.startrun = time.time()
args = tuple(getattr(i, 'value', i) for i in self.args)
self.value = self.operator(*args)
self.runtime = time.time() - self.startrun
return self
def setdepth(self, l=0):
for i in self.args:
if isinstance(i, Lazy):
i.setdepth(l+(1 if i.remote else 0))
self.depth = max(self.depth, l)
def __call__(self, *args):
s = Lazy(self.operator, self.remote, self.format, self.debug)
return s.call(*args)
def __add__(self, a):
return ADD(self, a)
def __radd__(self, a):
return ADD(a, self)
def __sub__(self, a):
return SUB(self, a)
def __rsub__(self, a):
return SUB(a, self)
def __mul__(self, a):
return MUL(self, a)
def __rmul__(self, a):
return MUL(a, self)
def __truediv__(self, a):
return DIV(self, a)
def __rtruediv__(self, a):
return DIV(a, self)
def __pow__(self, a):
return POW(self, a)
def __rpow__(self, a):
return POW(a, self)
def __getitem__(self, idx):
return GETITEM(self, idx)
def __eq__(self, other):
return self.operator == other.operator and self.args == other.args
def __hash__(self):
return hash((self.operator, self.args))
def opname(self):
if self.operator.__name__.startswith('_lazy_'):
return self.operator.__name__[6:]
else:
return self.operator.__name__
def __str__(self):
if self.func:
return 'Lazy(%s)' % (self.opname(),)
elif self.value is None:
return self.format.format(self.opname(), ', '.join(map(str, self.args)), *self.args)
else:
return str(self.value)
def __repr__(self):
return self.__str__()
def argsfinish(self):
self.argsready += 1
if self.argsready >= self.argscount:
Lazy.JOBS.add(self)
def finish(self, value):
self.runtime = time.time() - self.startrun
if self.debug:
print(self, '==', value)
self.value = value
for i in self.waitings[:]:
i.argsfinish()
del self.waitings[:]
if self.remote:
Lazy.ACTIVE -= 1
self.schedule()
if self.waitlock:
self.waitlock.release()
def startjob(self):
self.waittime = time.time() - self.startwait
self.startrun = time.time()
args = tuple(getattr(i, 'value', i) for i in self.args)
Lazy.JOBS.discard(self)
if self.remote:
Lazy.ACTIVE += 1
if Lazy.POOL:
Lazy.POOL.apply_async(self.operator, args, callback = self.finish)
else:
self.finish(self.operator(*args))
else:
self.finish(self.operator(*args))
def eval(self):
if self.value is not None:
return self.value
if Lazy.POOL is None and Lazy.CORES > 0:
Lazy.POOL = Pool(Lazy.CORES)
self.remote = False
self.setdepth(0)
self.waitlock = Lock()
self.waitlock.acquire()
self.schedule()
self.waitlock.acquire()
return self.value
@classmethod
def schedule(cls):
while cls.JOBS:
next = max(list(cls.JOBS), key=lambda x:x.depth)
if next.depth == -1 or next.remote and cls.CORES > 0 and cls.ACTIVE >= cls.CORES:
break
next.startjob()
ADD = Lazy(add, remote=True, format='({2}+{3})')
SUB = Lazy(sub, remote=True, format='({2}-{3})')
MUL = Lazy(mul, format='({2}*{3})')
DIV = Lazy(truediv, format='({2}/{3})')
POW = Lazy(pow, format='({2}**{3})')
GETITEM = Lazy(getitem, remote=False, format='{2}[{3}]')
INT = Lazy(int, remote=False, format='{2}')