forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreduce.jl
46 lines (39 loc) · 1.1 KB
/
preduce.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
importall Base
# figure 5.2 from principles of parallel programming, ported to julia.
# sum a vector using a tree on top of local reductions.
function sum(v::DArray)
P = procs(v)
nodeval = [RemoteRef(p) for p=P]
answer = RemoteRef()
np = numel(P)
for i=0:np-1
@spawnat P[i+1] begin
stride=1
tally = sum(localpart(v))
while stride < np
if i%(2*stride) == 0
tally = tally + take(nodeval[i+stride])
stride = 2*stride
else
put(nodeval[i], tally)
break
end
end
if i==0
put(answer, tally)
end
end
end
fetch(answer)
end
function reduce(f, v::DArray)
mapreduce(fetch, f,
[ @spawnat p reduce(f,localpart(v)) for p = procs(v) ])
end
# possibly-useful abstraction:
type RefGroup
refs::Array{RemoteRef,1}
RefGroup(P) = new([ RemoteRef(p) for p=P ])
end
getindex(r::RefGroup, i) = fetch(r.refs[i])
setindex!(r::RefGroup, v, i) = put(r.refs[i], v)