forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel.jl
210 lines (167 loc) · 6.28 KB
/
parallel.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# NOTE: worker processes cannot add more workers, only the client process can.
require("testdefs.jl")
if nprocs() < 3
remotecall_fetch(1, () -> addprocs(2))
end
id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
@test fetch(@spawnat id_other myid()) == id_other
@test @fetchfrom id_other begin myid() end == id_other
@fetch begin myid() end
d = drand((200,200), [id_me, id_other])
s = convert(Matrix{Float64}, d[1:150, 1:150])
a = convert(Matrix{Float64}, d)
@test a[1:150,1:150] == s
@test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1]
@test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101]
d=DArray(I->fill(myid(), map(length,I)), (10,10), [id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100
@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d)
@test reduce(+, d) == 100
dims = (20,20,20)
@linux_only begin
S = SharedArray(Int64, dims)
@test beginswith(S.segname, "/jl")
@test !ispath("/dev/shm" * S.segname)
S = SharedArray(Int64, dims; pids=[id_other])
@test beginswith(S.segname, "/jl")
@test !ispath("/dev/shm" * S.segname)
end
# TODO : Need a similar test of shmem cleanup for OSX
# SharedArray tests
d = Base.shmem_rand(1:100, dims)
a = convert(Array, d)
partsums = Array(Int, length(procs(d)))
@sync begin
for (i, p) in enumerate(procs(d))
@async partsums[i] = remotecall_fetch(p, D->sum(D.loc_subarr_1d), d)
end
end
@test sum(a) == sum(partsums)
d = Base.shmem_rand(dims)
for p in procs(d)
idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d)
idxf = first(idxes_in_p)
idxl = last(idxes_in_p)
d[idxf] = float64(idxf)
rv = remotecall_fetch(p, (D,idxf,idxl) -> begin assert(D[idxf] == float64(idxf)); D[idxl] = float64(idxl); D[idxl]; end, d,idxf,idxl)
@test d[idxl] == rv
end
@test ones(10, 10, 10) == Base.shmem_fill(1.0, (10,10,10))
@test zeros(Int32, 10, 10, 10) == Base.shmem_fill(0, (10,10,10))
d = Base.shmem_rand(dims)
s = Base.shmem_rand(dims)
copy!(s, d)
@test s == d
s = Base.shmem_rand(dims)
copy!(s, sdata(d))
@test s == d
d = SharedArray(Int, dims; init = D->fill!(D.loc_subarr_1d, myid()))
for p in procs(d)
idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d)
idxf = first(idxes_in_p)
idxl = last(idxes_in_p)
@test d[idxf] == p
@test d[idxl] == p
end
# issue #6362
d = Base.shmem_rand(dims)
s = copy(sdata(d))
ds = deepcopy(d)
@test ds == d
pids_ds = procs(ds)
remotecall_fetch(pids_ds[findfirst(id->(id != myid()), pids_ds)], setindex!, ds, 1.0, 1:10)
@test ds != d
@test s == d
# SharedArray as an array
# Since the data in d will depend on the nprocs, just test that these operations work
a = d[1:5]
@test_throws BoundsError d[-1:5]
a = d[1,1,1:3:end]
d[2:4] = 7
d[5,1:2:4,8] = 19
AA = rand(4,2)
A = convert(SharedArray, AA)
B = convert(SharedArray, AA')
@test B*A == AA'*AA
d=SharedArray(Int64, (10,10); init = D->fill!(D.loc_subarr_1d, myid()), pids=[id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100
@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d)
@test reduce(+, d) == 100
# Boundary cases where length(S) <= length(pids)
@test 2.0 == remotecall_fetch(id_other, D->D[2], Base.shmem_fill(2.0, 2; pids=[id_me, id_other]))
@test 3.0 == remotecall_fetch(id_other, D->D[1], Base.shmem_fill(3.0, 1; pids=[id_me, id_other]))
# Test @parallel load balancing - all processors should get either M or M+1
# iterations out of the loop range for some M.
if nprocs() < 4
remotecall_fetch(1, () -> addprocs(4 - nprocs()))
end
workloads = hist(@parallel((a,b)->[a,b], for i=1:7; myid(); end), nprocs())[2]
@test maximum(workloads) - minimum(workloads) <= 1
# @parallel reduction should work even with very short ranges
@test @parallel(+, for i=1:2; i; end) == 3
# Testing timedwait on multiple RemoteRefs
rr1 = RemoteRef()
rr2 = RemoteRef()
rr3 = RemoteRef()
@async begin sleep(0.5); put!(rr1, :ok) end
@async begin sleep(1.0); put!(rr2, :ok) end
@async begin sleep(2.0); put!(rr3, :ok) end
tic()
timedwait(1.0) do
all(map(isready, [rr1, rr2, rr3]))
end
et=toq()
# assuming that 0.5 seconds is a good enough buffer on a typical modern CPU
try
@test (et >= 1.0) && (et <= 1.5)
@test !isready(rr3)
catch
warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))")
end
@test isready(rr1)
# TODO: The below block should be always enabled but the error is printed by the event loop
# Hence in the event of any relevant changes to the parallel codebase,
# please define an ENV variable PTEST_FULL and ensure that the below block is
# executed successfully before committing/merging
if haskey(ENV, "PTEST_FULL")
println("START of parallel tests that print errors")
# make sure exceptions propagate when waiting on Tasks
@test_throws ErrorException (@sync (@async error("oops")))
# pmap tests
# needs at least 4 processors (which are being created above for the @parallel tests)
s = "a"*"bcdefghijklmnopqrstuvwxyz"^100;
ups = "A"*"BCDEFGHIJKLMNOPQRSTUVWXYZ"^100;
@test ups == bytestring(UInt8[uint8(c) for c in pmap(x->uppercase(x), s)])
@test ups == bytestring(UInt8[uint8(c) for c in pmap(x->uppercase(char(x)), s.data)])
# retry, on error exit
res = pmap(x->(x=='a') ? error("test error. don't panic.") : uppercase(x), s; err_retry=true, err_stop=true);
@test length(res) < length(ups)
@test isa(res[1], Exception)
# no retry, on error exit
res = pmap(x->(x=='a') ? error("test error. don't panic.") : uppercase(x), s; err_retry=false, err_stop=true);
@test length(res) < length(ups)
@test isa(res[1], Exception)
# retry, on error continue
res = pmap(x->iseven(myid()) ? error("test error. don't panic.") : uppercase(x), s; err_retry=true, err_stop=false);
@test length(res) == length(ups)
@test ups == bytestring(UInt8[uint8(c) for c in res])
# no retry, on error continue
res = pmap(x->(x=='a') ? error("test error. don't panic.") : uppercase(x), s; err_retry=false, err_stop=false);
@test length(res) == length(ups)
@test isa(res[1], Exception)
println("END of parallel tests that print errors")
end
# issue #7727
let A = [], B = []
t = @task produce(11)
@sync begin
@async for x in t; push!(A,x); end
@async for x in t; push!(B,x); end
end
@test (A == [11]) != (B == [11])
end