forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsharedarray.jl
338 lines (274 loc) · 10.5 KB
/
sharedarray.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
type SharedArray{T,N} <: DenseArray{T,N}
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Array{RemoteRef}
# The segname is currently used only in the test scripts to ensure that
# the shmem segment has been unlinked.
segname::String
# Fields below are not to be serialized
# Local shmem map.
s::Array{T,N}
# idx of current workers pid into the pids vector, 0 if this shared array is not mapped locally.
pidx::Int
# the local partition into the array when viewed as a single dimensional array.
# this can be removed when @parallel or its equivalent supports looping on
# a subset of workers.
loc_subarr_1d
SharedArray(d,p,r,sn) = new(d,p,r,sn)
end
function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])
N = length(dims)
isbits(T) || error("Type of Shared Array elements must be bits types")
@windows_only error(" SharedArray is not supported on Windows yet.")
if isempty(pids)
# only use workers on the current host
pids = procs(myid())
onlocalhost = true
else
onlocalhost = assert_same_host(pids)
end
local shm_seg_name = ""
local s
local S = nothing
local shmmem_create_pid
try
# On OSX, the shm_seg_name length must be < 32 characters
shm_seg_name = string("/jl", getpid(), int64(time() * 10^9))
if onlocalhost
shmmem_create_pid = myid()
s = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR)
else
# The shared array is created on a remote machine....
shmmem_create_pid = pids[1]
remotecall_fetch(pids[1], () -> begin shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end)
end
func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR)
refs = Array(RemoteRef, length(pids))
for (i, p) in enumerate(pids)
refs[i] = remotecall(p, func_mapshmem)
end
# Wait till all the workers have mapped the segment
for i in 1:length(refs)
wait(refs[i])
end
# All good, immediately unlink the segment.
if onlocalhost
shm_unlink(shm_seg_name)
else
remotecall_fetch(shmmem_create_pid, shm_unlink, shm_seg_name)
end
S = SharedArray{T,N}(dims, pids, refs, shm_seg_name)
shm_seg_name = ""
if onlocalhost
init_loc_flds(S)
# In the event that myid() is not part of pids, s will not be set
# in the init function above, hence setting it here if available.
S.s = s
else
S.pidx = 0
end
# if present init function is called on each of the parts
if isa(init, Function)
@sync begin
for p in pids
@async remotecall_wait(p, init, S)
end
end
end
finally
if shm_seg_name != ""
remotecall_fetch(shmmem_create_pid, shm_unlink, shm_seg_name)
end
end
S
end
SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...)
length(S::SharedArray) = prod(S.dims)
size(S::SharedArray) = S.dims
procs(S::SharedArray) = S.pids
indexpids(S::SharedArray) = S.pidx
sdata(S::SharedArray) = S.s
sdata(A::AbstractArray) = A
localindexes(S::SharedArray) = S.pidx > 0 ? range_1dim(S, S.pidx) : error("SharedArray is not mapped to this process")
convert{T}(::Type{Ptr{T}}, S::SharedArray) = convert(Ptr{T}, sdata(S))
convert(::Type{SharedArray}, A::Array) = (S = SharedArray(eltype(A), size(A)); copy!(S, A))
convert{T}(::Type{SharedArray{T}}, A::Array) = (S = SharedArray(T, size(A)); copy!(S, A))
convert{TS,TA,N}(::Type{SharedArray{TS,N}}, A::Array{TA,N}) = (S = SharedArray(TS, size(A)); copy!(S, A))
function deepcopy_internal(S::SharedArray, stackdict::ObjectIdDict)
haskey(stackdict, S) && return stackdict[S]
# Note: copy can be used here because SharedArrays are restricted to isbits types
R = copy(S)
stackdict[S] = R
return R
end
function range_1dim(S::SharedArray, pidx)
l = length(S)
nw = length(S.pids)
partlen = div(l, nw)
if l < nw
if pidx <= l
return pidx:pidx
else
return 1:0
end
elseif pidx == nw
return (((pidx-1) * partlen) + 1):l
else
return (((pidx-1) * partlen) + 1):(pidx*partlen)
end
end
sub_1dim(S::SharedArray, pidx) = sub(S.s, range_1dim(S, pidx))
function init_loc_flds(S)
if myid() in S.pids
S.pidx = findfirst(S.pids, myid())
S.s = fetch(S.refs[S.pidx])
S.loc_subarr_1d = sub_1dim(S, S.pidx)
else
S.pidx = 0
end
end
# Don't serialize s (it is the complete array) and
# pidx, which is relevant to the current process only
function serialize(s, S::SharedArray)
serialize_type(s, typeof(S))
serialize(s, length(SharedArray.names))
for n in SharedArray.names
if n in [:s, :pidx, :loc_subarr_1d]
writetag(s, UndefRefTag)
else
serialize(s, getfield(S, n))
end
end
end
function deserialize{T,N}(s, t::Type{SharedArray{T,N}})
S = invoke(deserialize, (Any, DataType), s, t)
init_loc_flds(S)
if (S.pidx == 0)
error("SharedArray cannot be used on a non-participating process")
end
S
end
convert(::Type{Array}, S::SharedArray) = S.s
# # pass through getindex and setindex! - they always work on the complete array unlike DArrays
getindex(S::SharedArray) = getindex(S.s)
getindex(S::SharedArray, I::Real) = getindex(S.s, I)
getindex(S::SharedArray, I::AbstractArray) = getindex(S.s, I)
@nsplat N 1:5 getindex(S::SharedArray, I::NTuple{N,Any}...) = getindex(S.s, I...)
setindex!(S::SharedArray, x) = (setindex!(S.s, x); S)
setindex!(S::SharedArray, x, I::Real) = (setindex!(S.s, x, I); S)
setindex!(S::SharedArray, x, I::AbstractArray) = (setindex!(S.s, x, I); S)
@nsplat N 1:5 setindex!(S::SharedArray, x, I::NTuple{N,Any}...) = (setindex!(S.s, x, I...); S)
# convenience constructors
function shmem_fill(v, dims; kwargs...)
SharedArray(typeof(v), dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...)
end
shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...)
# rand variant with range
function shmem_rand(TR::Union(DataType, UnitRange), dims; kwargs...)
if isa(TR, UnitRange)
SharedArray(Int, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
else
SharedArray(TR, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
end
end
shmem_rand(TR::Union(DataType, UnitRange), i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...)
shmem_rand(TR::Union(DataType, UnitRange), I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...)
shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...)
shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...)
function shmem_randn(dims; kwargs...)
SharedArray(Float64, dims; init = S-> map!((x)->randn(), S.loc_subarr_1d), kwargs...)
end
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)
similar(S::SharedArray, T, dims::Dims) = SharedArray(T, dims; pids=procs(S))
similar(S::SharedArray, T) = similar(S, T, size(S))
similar(S::SharedArray, dims::Dims) = similar(S, eltype(S), dims)
similar(S::SharedArray) = similar(S, eltype(S), size(S))
map(f::Callable, S::SharedArray) = (S2 = similar(S); S2[:] = S[:]; map!(f, S2); S2)
reduce(f::Function, S::SharedArray) =
mapreduce(fetch, f,
{ @spawnat p reduce(f, S.loc_subarr_1d) for p in procs(S) })
function map!(f::Callable, S::SharedArray)
@sync for p in procs(S)
@spawnat p begin
for idx in localindexes(S)
S.s[idx] = f(S.s[idx])
end
end
end
return S
end
copy!(S::SharedArray, A::Array) = copy!(S.s, A)
function copy!(S::SharedArray, R::SharedArray)
length(S) == length(R) || throw(BoundsError())
ps = intersect(procs(S), procs(R))
isempty(ps) && error("source and destination arrays don't share any process")
l = length(S)
length(ps) > l && (ps = ps[1:l])
nw = length(ps)
partlen = div(l, nw)
@sync for i = 1:nw
p = ps[i]
idx = i < nw ? ((i-1)*partlen+1:i*partlen) : ((i-1)*partlen+1:l)
@spawnat p begin
S.s[idx] = R.s[idx]
end
end
return S
end
function print_shmem_limits(slen)
try
@linux_only pfx = "kernel"
@osx_only pfx = "kern.sysv"
shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024)
page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end])
shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024)
println("System max size of single shmem segment(MB) : ", shmmax_MB,
"\nSystem max size of all shmem segments(MB) : ", shmall_MB,
"\nRequested size(MB) : ", div(slen, 1024*1024),
"\nPlease ensure requested size is within system limits.",
"\nIf not, increase system limits and try again."
)
catch e
nothing # Ignore any errors in this...
end
end
# utilities
function shm_mmap_array(T, dims, shm_seg_name, mode)
local s = nothing
local A = nothing
try
fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR)
systemerror("shm_open() failed for " * shm_seg_name, fd_mem <= 0)
s = fdio(fd_mem, true)
# On OSX, ftruncate must to used to set size of segment, just lseek does not work.
# and only at creation time
if (mode & JL_O_CREAT) == JL_O_CREAT
rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T))
systemerror("ftruncate() failed for shm segment " * shm_seg_name, rc != 0)
end
A = mmap_array(T, dims, s, zero(FileOffset), grow=false)
catch e
print_shmem_limits(prod(dims)*sizeof(T))
rethrow(e)
finally
if s != nothing
close(s)
end
end
A
end
@unix_only begin
function shm_unlink(shm_seg_name)
rc = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name)
systemerror("Error unlinking shmem segment " * shm_seg_name, rc != 0)
rc
end
end
@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions)
function assert_same_host(procs)
first_privip = getprivipaddr(procs[1])
if !all(x -> getprivipaddr(x) == first_privip, procs)
error("SharedArray requires all requested processes to be on the same machine.")
end
return myid() in procs
end