forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel_exec.jl
1188 lines (1003 loc) · 32.6 KB
/
parallel_exec.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# This file is a part of Julia. License is MIT: http://julialang.org/license
using Base.Test
inline_flag = Base.JLOptions().can_inline == 1 ? `` : `--inline=no`
cov_flag = ``
if Base.JLOptions().code_coverage == 1
cov_flag = `--code-coverage=user`
elseif Base.JLOptions().code_coverage == 2
cov_flag = `--code-coverage=all`
end
# Test a few "remote" invocations when no workers are present
@test remote(myid)() == 1
@test pmap(identity, 1:100) == [1:100...]
@test 100 == @parallel (+) for i in 1:100
1
end
addprocs(4; exeflags=`$cov_flag $inline_flag --check-bounds=yes --startup-file=no --depwarn=error`)
# Test remote()
let
pool = Base.default_worker_pool()
count = 0
count_condition = Condition()
function remote_wait(c)
@async begin
count += 1
remote(take!)(c)
count -= 1
notify(count_condition)
end
yield()
end
testchannels = [RemoteChannel() for i in 1:nworkers()]
testcount = 0
@test isready(pool) == true
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false
for c in testchannels
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end
@test count == 0
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false
for c in reverse(testchannels)
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end
@test count == 0
end
id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
# Test Futures
function testf(id)
f=Future(id)
@test isready(f) == false
@test isnull(f.v) == true
put!(f, :OK)
@test isready(f) == true
@test isnull(f.v) == false
@test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future
@test_throws MethodError take!(f) # take! is unsupported on a Future
@test fetch(f) == :OK
end
testf(id_me)
testf(id_other)
# Distributed GC tests for Futures
function test_futures_dgc(id)
f = remotecall(myid, id)
fid = Base.remoteref_id(f)
# remote value should be deleted after a fetch
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, fid) == true
@test isnull(f.v) == true
@test fetch(f) == id
@test isnull(f.v) == false
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, fid) == false
# if unfetched, it should be deleted after a finalize
f = remotecall(myid, id)
fid = Base.remoteref_id(f)
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, fid) == true
@test isnull(f.v) == true
finalize(f)
Base.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, fid) == false
end
test_futures_dgc(id_me)
test_futures_dgc(id_other)
# if sent to another worker, it should not be deleted till the other worker has fetched.
wid1 = workers()[1]
wid2 = workers()[2]
f = remotecall(myid, wid1)
fid = Base.remoteref_id(f)
fstore = RemoteChannel(wid2)
put!(fstore, f)
@test fetch(f) == wid1
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true
remotecall_fetch(r->fetch(fetch(r)), wid2, fstore)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false
# put! should release remote reference since it would have been cached locally
f = Future(wid1)
fid = Base.remoteref_id(f)
# should not be created remotely till accessed
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false
# create it remotely
isready(f)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true
put!(f, :OK)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false
@test fetch(f) == :OK
# RemoteException should be thrown on a put! when another process has set the value
f = Future(wid1)
fid = Base.remoteref_id(f)
fstore = RemoteChannel(wid2)
put!(fstore, f) # send f to wid2
put!(f, :OK) # set value from master
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true
testval = remotecall_fetch(wid2, fstore) do x
try
put!(fetch(x), :OK)
return 0
catch e
if isa(e, RemoteException)
return 1
else
return 2
end
end
end
@test testval == 1
# Distributed GC tests for RemoteChannels
function test_remoteref_dgc(id)
rr = RemoteChannel(id)
put!(rr, :OK)
rrid = Base.remoteref_id(rr)
# remote value should be deleted after finalizing the ref
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, rrid) == true
@test fetch(rr) == :OK
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, rrid) == true
finalize(rr)
Base.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.PGRP.refs, k)), id, rrid) == false
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)
# if sent to another worker, it should not be deleted till the other worker has also finalized.
wid1 = workers()[1]
wid2 = workers()[2]
rr = RemoteChannel(wid1)
rrid = Base.remoteref_id(rr)
fstore = RemoteChannel(wid2)
put!(fstore, rr)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true
finalize(rr); Base.flush_gc_msgs() # finalize locally
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true
remotecall_fetch(r->(finalize(take!(r)); Base.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely
sleep(0.5) # to ensure that wid2 messages have been executed on wid1
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == false
@test fetch(@spawnat id_other myid()) == id_other
@test @fetchfrom id_other begin myid() end == id_other
@fetch begin myid() end
# test getindex on Futures and RemoteChannels
function test_indexing(rr)
a = rand(5,5)
put!(rr, a)
@test rr[2,3] == a[2,3]
@test rr[] == a
end
test_indexing(Future())
test_indexing(Future(id_other))
test_indexing(RemoteChannel())
test_indexing(RemoteChannel(id_other))
dims = (20,20,20)
if is_linux()
S = SharedArray(Int64, dims)
@test startswith(S.segname, "/jl")
@test !ispath("/dev/shm" * S.segname)
S = SharedArray(Int64, dims; pids=[id_other])
@test startswith(S.segname, "/jl")
@test !ispath("/dev/shm" * S.segname)
end
# TODO : Need a similar test of shmem cleanup for OSX
##### SharedArray tests
function check_pids_all(S::SharedArray)
pidtested = falses(size(S))
for p in procs(S)
idxes_in_p = remotecall_fetch(p, S) do D
parentindexes(D.loc_subarr_1d)[1]
end
@test all(sdata(S)[idxes_in_p] .== p)
pidtested[idxes_in_p] = true
end
@test all(pidtested)
end
d = Base.shmem_rand(1:100, dims)
a = convert(Array, d)
partsums = Array{Int}(length(procs(d)))
@sync begin
for (i, p) in enumerate(procs(d))
@async partsums[i] = remotecall_fetch(p, d) do D
sum(D.loc_subarr_1d)
end
end
end
@test sum(a) == sum(partsums)
d = Base.shmem_rand(dims)
for p in procs(d)
idxes_in_p = remotecall_fetch(p, d) do D
parentindexes(D.loc_subarr_1d)[1]
end
idxf = first(idxes_in_p)
idxl = last(idxes_in_p)
d[idxf] = Float64(idxf)
rv = remotecall_fetch(p, d,idxf,idxl) do D,idxf,idxl
assert(D[idxf] == Float64(idxf))
D[idxl] = Float64(idxl)
D[idxl]
end
@test d[idxl] == rv
end
@test ones(10, 10, 10) == Base.shmem_fill(1.0, (10,10,10))
@test zeros(Int32, 10, 10, 10) == Base.shmem_fill(0, (10,10,10))
d = Base.shmem_rand(dims)
s = Base.shmem_rand(dims)
copy!(s, d)
@test s == d
s = Base.shmem_rand(dims)
copy!(s, sdata(d))
@test s == d
a = rand(dims)
@test sdata(a) == a
d = SharedArray(Int, dims, init = D->fill!(D.loc_subarr_1d, myid()))
for p in procs(d)
idxes_in_p = remotecall_fetch(p, d) do D
parentindexes(D.loc_subarr_1d)[1]
end
idxf = first(idxes_in_p)
idxl = last(idxes_in_p)
@test d[idxf] == p
@test d[idxl] == p
end
d = @inferred(SharedArray(Float64, (2,3)))
@test isa(d[:,2], Vector{Float64})
### SharedArrays from a file
# Mapping an existing file
fn = tempname()
write(fn, 1:30)
sz = (6,5)
Atrue = reshape(1:30, sz)
S = @inferred(SharedArray(fn, Int, sz))
@test S == Atrue
@test length(procs(S)) > 1
@sync begin
for p in procs(S)
@async remotecall_wait(p, S) do D
fill!(D.loc_subarr_1d, myid())
end
end
end
check_pids_all(S)
filedata = similar(Atrue)
read!(fn, filedata)
@test filedata == sdata(S)
finalize(S)
# Error for write-only files
@test_throws ArgumentError SharedArray(fn, Int, sz, mode="w")
# Error for file doesn't exist, but not allowed to create
@test_throws ArgumentError SharedArray(joinpath(tempdir(),randstring()), Int, sz, mode="r")
# Creating a new file
fn2 = tempname()
S = SharedArray(fn2, Int, sz, init=D->D[localindexes(D)] = myid())
@test S == filedata
filedata2 = similar(Atrue)
read!(fn2, filedata2)
@test filedata == filedata2
finalize(S)
# Appending to a file
fn3 = tempname()
write(fn3, ones(UInt8, 4))
S = SharedArray(fn3, UInt8, sz, 4, mode="a+", init=D->D[localindexes(D)]=0x02)
len = prod(sz)+4
@test filesize(fn3) == len
filedata = Array{UInt8}(len)
read!(fn3, filedata)
@test all(filedata[1:4] .== 0x01)
@test all(filedata[5:end] .== 0x02)
finalize(S)
# call gc 3 times to avoid unlink: operation not permitted (EPERM) on Windows
S = nothing
@everywhere gc()
@everywhere gc()
@everywhere gc()
rm(fn); rm(fn2); rm(fn3)
### Utility functions
# construct PR #13514
S = @inferred(SharedArray{Int}((1,2,3)))
@test size(S) == (1,2,3)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(2))
@test size(S) == (2,)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(1,2))
@test size(S) == (1,2)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(1,2,3))
@test size(S) == (1,2,3)
@test typeof(S) <: SharedArray{Int}
# reshape
d = Base.shmem_fill(1.0, (10,10,10))
@test ones(100, 10) == reshape(d,(100,10))
d = Base.shmem_fill(1.0, (10,10,10))
@test_throws DimensionMismatch reshape(d,(50,))
# rand, randn
d = Base.shmem_rand(dims)
@test size(rand!(d)) == dims
d = Base.shmem_fill(1.0, dims)
@test size(randn!(d)) == dims
# similar
d = Base.shmem_rand(dims)
@test size(similar(d, Complex128)) == dims
@test size(similar(d, dims)) == dims
# issue #6362
d = Base.shmem_rand(dims)
s = copy(sdata(d))
ds = deepcopy(d)
@test ds == d
pids_d = procs(d)
remotecall_fetch(setindex!, pids_d[findfirst(id->(id != myid()), pids_d)], d, 1.0, 1:10)
@test ds != d
@test s != d
copy!(d, s)
@everywhere setid!(A) = A[localindexes(A)] = myid()
@sync for p in procs(ds)
@async remotecall_wait(setid!, p, ds)
end
@test d == s
@test ds != s
@test first(ds) == first(procs(ds))
@test last(ds) == last(procs(ds))
# SharedArray as an array
# Since the data in d will depend on the nprocs, just test that these operations work
a = d[1:5]
@test_throws BoundsError d[-1:5]
a = d[1,1,1:3:end]
d[2:4] = 7
d[5,1:2:4,8] = 19
AA = rand(4,2)
A = @inferred(convert(SharedArray, AA))
B = @inferred(convert(SharedArray, AA'))
@test B*A == ctranspose(AA)*AA
d=SharedArray(Int64, (10,10); init = D->fill!(D.loc_subarr_1d, myid()), pids=[id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100
@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d)
@test reduce(+, d) == 100
@test fill!(d, 1) == ones(10, 10)
@test fill!(d, 2.) == fill(2, 10, 10)
@test d[:] == fill(2, 100)
@test d[:,1] == fill(2, 10)
@test d[1,:] == fill(2, 10)
# Boundary cases where length(S) <= length(pids)
@test 2.0 == remotecall_fetch(D->D[2], id_other, Base.shmem_fill(2.0, 2; pids=[id_me, id_other]))
@test 3.0 == remotecall_fetch(D->D[1], id_other, Base.shmem_fill(3.0, 1; pids=[id_me, id_other]))
# Shared arrays of singleton immutables
@everywhere immutable ShmemFoo end
for T in [Void, ShmemFoo]
s = @inferred(SharedArray(T, 10))
@test T() === remotecall_fetch(x->x[3], workers()[1], s)
end
# Issue #14664
d = SharedArray(Int,10)
@sync @parallel for i=1:10
d[i] = i
end
for (x,i) in enumerate(d)
@test x == i
end
# complex
sd = SharedArray(Int,10)
se = SharedArray(Int,10)
@sync @parallel for i=1:10
sd[i] = i
se[i] = i
end
sc = complex(sd,se)
for (x,i) in enumerate(sc)
@test i == complex(x,x)
end
# Once finalized accessing remote references and shared arrays should result in exceptions.
function finalize_and_test(r)
finalize(r)
@test_throws ErrorException fetch(r)
end
for id in [id_me, id_other]
finalize_and_test(Future(id))
finalize_and_test((r=Future(id); put!(r, 1); r))
finalize_and_test(RemoteChannel(id))
finalize_and_test((r=RemoteChannel(id); put!(r, 1); r))
end
d = SharedArray(Int,10)
finalize(d)
@test_throws BoundsError d[1]
# Test @parallel load balancing - all processors should get either M or M+1
# iterations out of the loop range for some M.
ids = @parallel((a,b)->[a;b], for i=1:7; myid(); end)
workloads = Int[sum(ids .== i) for i in 2:nprocs()]
@test maximum(workloads) - minimum(workloads) <= 1
# @parallel reduction should work even with very short ranges
@test @parallel(+, for i=1:2; i; end) == 3
# Testing timedwait on multiple channels
@sync begin
rr1 = Channel()
rr2 = Channel()
rr3 = Channel()
callback() = all(map(isready, [rr1, rr2, rr3]))
# precompile functions which will be tested for execution time
@test !callback()
@test timedwait(callback, 0.0) === :timed_out
@async begin sleep(0.5); put!(rr1, :ok) end
@async begin sleep(1.0); put!(rr2, :ok) end
@async begin sleep(2.0); put!(rr3, :ok) end
tic()
timedwait(callback, 1.0)
et=toq()
# assuming that 0.5 seconds is a good enough buffer on a typical modern CPU
try
@test (et >= 1.0) && (et <= 1.5)
@test !isready(rr3)
catch
warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))")
end
@test isready(rr1)
end
# Test multiple concurrent put!/take! on a channel
function testcpt()
c = Channel()
size = 0
inc() = size += 1
dec() = size -= 1
@sync for i = 1:10^4
@async (sleep(rand()); put!(c, i); inc())
@async (sleep(rand()); take!(c); dec())
end
@test size == 0
end
testcpt()
# Test multiple "for" loops waiting on the same channel which
# is closed after adding a few elements.
c=Channel()
results=[]
@sync begin
for i in 1:20
@async for i in c
push!(results, i)
end
end
sleep(1.0)
for i in 1:5
put!(c,i)
end
close(c)
end
@test sum(results) == 15
@test_throws ArgumentError sleep(-1)
@test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5)
# specify pids for pmap
@test sort(workers()[1:2]) == sort(unique(pmap(WorkerPool(workers()[1:2]), x->(sleep(0.1);myid()), 1:10)))
# Testing buffered and unbuffered reads
# This large array should write directly to the socket
a = ones(10^6)
@test a == remotecall_fetch((x)->x, id_other, a)
# Not a bitstype, should be buffered
s = [randstring() for x in 1:10^5]
@test s == remotecall_fetch((x)->x, id_other, s)
#large number of small requests
num_small_requests = 10000
@test fill(id_other, num_small_requests) == [remotecall_fetch(myid, id_other) for i in 1:num_small_requests]
# test parallel sends of large arrays from multiple tasks to the same remote worker
ntasks = 10
rr_list = [Channel() for x in 1:ntasks]
a = ones(2*10^5)
for rr in rr_list
@async let rr=rr
try
for i in 1:10
@test a == remotecall_fetch((x)->x, id_other, a)
yield()
end
put!(rr, :OK)
catch
put!(rr, :ERROR)
end
end
end
@test [fetch(rr) for rr in rr_list] == [:OK for x in 1:ntasks]
function test_channel(c)
put!(c, 1)
put!(c, "Hello")
put!(c, 5.0)
@test isready(c) == true
@test fetch(c) == 1
@test fetch(c) == 1 # Should not have been popped previously
@test take!(c) == 1
@test take!(c) == "Hello"
@test fetch(c) == 5.0
@test take!(c) == 5.0
@test isready(c) == false
close(c)
end
test_channel(Channel(10))
test_channel(RemoteChannel(()->Channel(10)))
c=Channel{Int}(1)
@test_throws MethodError put!(c, "Hello")
# test channel iterations
function test_iteration(in_c, out_c)
t=@schedule for v in in_c
put!(out_c, v)
end
isa(in_c, Channel) && @test isopen(in_c) == true
put!(in_c, 1)
@test take!(out_c) == 1
put!(in_c, "Hello")
close(in_c)
@test take!(out_c) == "Hello"
isa(in_c, Channel) && @test isopen(in_c) == false
@test_throws InvalidStateException put!(in_c, :foo)
yield()
@test istaskdone(t) == true
end
test_iteration(Channel(10), Channel(10))
# make sure exceptions propagate when waiting on Tasks
@test_throws CompositeException (@sync (@async error("oops")))
try
@sync begin
for i in 1:5
@async error(i)
end
end
error("unexpected")
catch ex
@test typeof(ex) == CompositeException
@test length(ex) == 5
@test typeof(ex.exceptions[1]) == CapturedException
@test typeof(ex.exceptions[1].ex) == ErrorException
errors = map(x->x.ex.msg, ex.exceptions)
@test collect(1:5) == sort(map(x->parse(Int, x), errors))
end
function test_remoteexception_thrown(expr)
try
expr()
error("unexpected")
catch ex
@test typeof(ex) == RemoteException
@test typeof(ex.captured) == CapturedException
@test typeof(ex.captured.ex) == ErrorException
@test ex.captured.ex.msg == "foobar"
end
end
for id in [id_other, id_me]
test_remoteexception_thrown() do
remotecall_fetch(id) do
throw(ErrorException("foobar"))
end
end
test_remoteexception_thrown() do
remotecall_wait(id) do
throw(ErrorException("foobar"))
end
end
test_remoteexception_thrown() do
wait(remotecall(id) do
throw(ErrorException("foobar"))
end)
end
end
# make sure the stackframe from the remote error can be serialized
let ex
try
remotecall_fetch(id_other) do
@eval module AModuleLocalToOther
foo() = throw(ErrorException("A.error"))
foo()
end
end
catch ex
end
@test (ex::RemoteException).pid == id_other
@test ((ex.captured::CapturedException).ex::ErrorException).msg == "A.error"
bt = ex.captured.processed_bt::Array{Any,1}
@test length(bt) > 1
frame, repeated = bt[1]::Tuple{StackFrame, Int}
@test frame.func == :foo
@test isnull(frame.linfo)
@test repeated == 1
end
# pmap tests. Needs at least 4 processors dedicated to the below tests. Which we currently have
# since the parallel tests are now spawned as a separate set.
function unmangle_exception(e)
while any(x->isa(e, x), [CompositeException, RemoteException, CapturedException])
if isa(e, CompositeException)
e = e.exceptions[1].ex
end
if isa(e, RemoteException)
e = e.captured.ex
end
if isa(e, CapturedException)
e = e.ex
end
end
return e
end
# Test all combinations of pmap keyword args.
pmap_args = [
(:distributed, [:default, false]),
(:batch_size, [:default,2]),
(:on_error, [:default, e -> unmangle_exception(e).msg == "foobar"]),
(:retry_on, [:default, e -> unmangle_exception(e).msg == "foobar"]),
(:retry_n, [:default, typemax(Int)-1]),
(:retry_max_delay, [0, 0.001])
]
kwdict = Dict()
function walk_args(i)
if i > length(pmap_args)
kwargs = []
for (k,v) in kwdict
if v !== :default
push!(kwargs, (k,v))
end
end
data = [1:100...]
testw = kwdict[:distributed] === false ? [1] : workers()
if (kwdict[:on_error] === :default) && (kwdict[:retry_n] === :default)
mapf = x -> (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
pids = [x[2] for x in pmap_res]
@test results == [2:2:200...]
for p in testw
@test p in pids
end
end
elseif kwdict[:retry_n] !== :default
mapf = x -> iseven(myid()) ? error("foobar") : (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
pids = [x[2] for x in pmap_res]
@test results == [2:2:200...]
for p in testw
if isodd(p)
@test p in pids
else
@test !(p in pids)
end
end
end
else (kwdict[:on_error] !== :default) && (kwdict[:retry_n] === :default)
mapf = x -> iseven(x) ? error("foobar") : (x*2, myid())
results_test = pmap_res -> begin
w = testw
for (idx,x) in enumerate(data)
if iseven(x)
@test pmap_res[idx] == true
else
@test pmap_res[idx][1] == x*2
@test pmap_res[idx][2] in w
end
end
end
end
try
results_test(pmap(mapf, data; kwargs...))
catch e
println("pmap executing with args : ", kwargs)
rethrow(e)
end
return
end
kwdict[pmap_args[i][1]] = pmap_args[i][2][1]
walk_args(i+1)
kwdict[pmap_args[i][1]] = pmap_args[i][2][2]
walk_args(i+1)
end
# Start test for various kw arg combinations
walk_args(1)
# Simple test for pmap throws error
error_thrown = false
try
pmap(x -> x==50 ? error("foobar") : x, 1:100)
catch e
@test unmangle_exception(e).msg == "foobar"
error_thrown = true
end
@test error_thrown
# Test pmap with a generator type iterator
@test [1:100...] == pmap(x->x, Base.Generator(x->(sleep(0.0001); x), 1:100))
# Test asyncmap
@test allunique(asyncmap(x->(sleep(1.0);object_id(current_task())), 1:10))
# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(wp, x->x, 1:100)
clear!(wp)
@test length(wp.map_obj2ref) == 0
# The below block of tests are usually run only on local development systems, since:
# - tests which print errors
# - addprocs tests are memory intensive
# - ssh addprocs requires sshd to be running locally with passwordless login enabled.
# The test block is enabled by defining env JULIA_TESTFULL=1
DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0"))))
if DoFullTest
# Topology tests need to run externally since a given cluster at any
# time can only support a single topology and the current session
# is already running in parallel under the default topology.
script = joinpath(dirname(@__FILE__), "topology.jl")
cmd = `$(Base.julia_cmd()) $script`
(strm, proc) = open(pipeline(cmd, stderr=STDERR))
wait(proc)
if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0
println(readstring(strm))
error("Topology tests failed : $cmd")
end
println("Testing exception printing on remote worker from a `remote_do` call")
println("Please ensure the remote error and backtrace is displayed on screen")
Base.remote_do(id_other) do
throw(ErrorException("TESTING EXCEPTION ON REMOTE DO. PLEASE IGNORE"))
end
sleep(0.5) # Give some time for the above error to be printed
println("\n\nThe following 'invalid connection credentials' error messages are to be ignored.")
all_w = workers()
# Test sending fake data to workers. The worker processes will print an
# error message but should not terminate.
for w in Base.PGRP.workers
if isa(w, Base.Worker)
s = connect(get(w.config.host), get(w.config.port))
write(s, randstring(32))
end
end
@test workers() == all_w
@test all([p == remotecall_fetch(myid, p) for p in all_w])
if is_unix() # aka have ssh
function test_n_remove_pids(new_pids)
for p in new_pids
w_in_remote = sort(remotecall_fetch(workers, p))
try
@test intersect(new_pids, w_in_remote) == new_pids
catch e
print("p : $p\n")
print("newpids : $new_pids\n")
print("w_in_remote : $w_in_remote\n")
print("intersect : $(intersect(new_pids, w_in_remote))\n\n\n")
rethrow(e)
end
end
@test :ok == remotecall_fetch(1, new_pids) do p
rmprocs(p; waitfor=5.0)
end
end
print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
print("Please ensure sshd is running locally with passwordless login enabled.\n")
sshflags = `-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR `
#Issue #9951
hosts=[]
localhost_aliases = ["localhost", string(getipaddr()), "127.0.0.1"]
num_workers = parse(Int,(get(ENV, "JULIA_ADDPROCS_NUM", "9")))
for i in 1:(num_workers/length(localhost_aliases))
append!(hosts, localhost_aliases)
end
print("\nTesting SSH addprocs with $(length(hosts)) workers...\n")
new_pids = remotecall_fetch(1, hosts, sshflags) do h, sf
addprocs(h; sshflags=sf)
end
@test length(new_pids) == length(hosts)
test_n_remove_pids(new_pids)
print("\nMixed ssh addprocs with :auto\n")
new_pids = sort(remotecall_fetch(1, ["localhost", ("127.0.0.1", :auto), "localhost"], sshflags) do h, sf
addprocs(h; sshflags=sf)
end)
@test length(new_pids) == (2 + Sys.CPU_CORES)
test_n_remove_pids(new_pids)
print("\nMixed ssh addprocs with numeric counts\n")
new_pids = sort(remotecall_fetch(1, [("localhost", 2), ("127.0.0.1", 2), "localhost"], sshflags) do h, sf
addprocs(h; sshflags=sf)
end)
@test length(new_pids) == 5
test_n_remove_pids(new_pids)
print("\nssh addprocs with tunnel\n")
new_pids = sort(remotecall_fetch(1, [("localhost", num_workers)], sshflags) do h, sf
addprocs(h; tunnel=true, sshflags=sf)
end)
@test length(new_pids) == num_workers
test_n_remove_pids(new_pids)
end # unix-only
end # full-test
# issue #7727
let A = [], B = []
t = @task produce(11)
@sync begin
@async for x in t; push!(A,x); end
@async for x in t; push!(B,x); end
end
@test (A == [11]) != (B == [11])
end
let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws ErrorException wait(t)
end
# issue #8207
let A = Any[]
@parallel (+) for i in (push!(A,1); 1:2)
i
end
@test length(A) == 1
end
# issue #13168
function f13168(n)
val = 0
for i=1:n val+=sum(rand(n,n)^2) end
val
end
let t = schedule(@task f13168(100))
@test t.state == :queued
@test_throws ErrorException schedule(t)
yield()
@test t.state == :done
@test_throws ErrorException schedule(t)
@test isa(wait(t),Float64)
end
# issue #13122
@test remotecall_fetch(identity, workers()[1], C_NULL) === C_NULL
# issue #11062
function t11062()
@async v11062 = 1
v11062 = 2
end