Skip to content

Commit

Permalink
state cleanup and exception for exited workers. fixes JuliaLang#3436
Browse files Browse the repository at this point in the history
now the tests are able to complete and pass even if a worker exits,
  which we might not actually want!
closes JuliaLang#3050 (rmprocs)
fixes JuliaLang#2793 (hang when remote process exits)
  • Loading branch information
JeffBezanson committed Jul 12, 2013
1 parent 434f5c0 commit f752fae
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 23 deletions.
2 changes: 1 addition & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export

# Exceptions
ArgumentError,
DisconnectException,
ProcessExitedException,
EOFError,
ErrorException,
KeyError,
Expand Down
64 changes: 43 additions & 21 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ function rmprocs(args...)

for i in [args...]
if haskey(map_pid_wrkr, i)
remotecall(i, exit)
remote_do(i, exit)
end
end
end
Expand All @@ -247,6 +247,9 @@ end
worker_from_id(i) = worker_from_id(PGRP, i)
function worker_from_id(pg::ProcessGroup, i)
# Processes with pids > ours, have to connect to us. May not have happened. Wait for some time.
if myid()==1 && !haskey(map_pid_wrkr,i)
error("no process with id $i exists")
end
start = time()
while (!haskey(map_pid_wrkr, i) && ((time() - start) < 60.0))
sleep(0.1)
Expand Down Expand Up @@ -282,6 +285,29 @@ function deregister_worker(pg, pid)
pg.workers = filter(x -> !(x.id == pid), pg.workers)
w = delete!(map_pid_wrkr, pid, nothing)
if isa(w, Worker) delete!(map_sock_wrkr, w.socket) end

# delete this worker from our RemoteRef client sets
ids = {}
for (id,rv) in pg.refs
if contains(rv.clientset,pid)
push!(ids, id)
end
end
for id in ids
del_client(pg, id, pid)
end

# throw exception to tasks waiting for this pid
rrs = {}
for (rr,cv) in Waiting
if rr.where == pid
notify(cv, ProcessExitedException())
push!(rrs, rr)
end
end
for rr in rrs
delete!(Waiting, rr)
end
end

## remote refs ##
Expand Down Expand Up @@ -451,11 +477,10 @@ end

# wait on a local proxy condition for a remote ref
function wait_full(rr::RemoteRef)
oid = rr2id(rr)
cv = get(Waiting, oid, false)
cv = get(Waiting, rr, false)
if cv === false
cv = Condition()
Waiting[oid] = cv
Waiting[rr] = cv
end
wait(cv)
end
Expand Down Expand Up @@ -737,8 +762,6 @@ function accept_handler(server::TcpServer, status::Int32)
create_message_handler_loop(client)
end

type DisconnectException <: Exception end

# schedule an expression to run asynchronously, with minimal ceremony
macro schedule(expr)
expr = localize_vars(:(()->($expr)), false)
Expand Down Expand Up @@ -784,13 +807,14 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
# used to deliver result of wait or fetch
mkind = deserialize(sock)
oid = deserialize(sock)
rr = WeakRemoteRef(0, oid[1], oid[2])
#print("$(myid()) got $msg $oid\n")
val = deserialize(sock)
cv = get(Waiting, oid, false)
cv = get(Waiting, rr, false)
if cv !== false
notify(cv, val)
if isempty(cv.waitq)
delete!(Waiting, oid)
delete!(Waiting, rr)
end
end
elseif is(msg, :identify_socket)
Expand Down Expand Up @@ -1230,22 +1254,25 @@ function pmap(f, lsts...)
np = nprocs()
n = length(lsts[1])
results = cell(n)
queue = [1:n]
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
nextidx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
wpid = PGRP.workers[p].id
if wpid != myid() || np == 1
@async begin
while true
idx = nextidx()
if idx > n
if isempty(queue)
break
end
idx = shift!(queue)
try
results[idx] = remotecall_fetch(wpid, f,
map(L->L[idx], lsts)...)
catch
push!(queue, idx)
break
end
results[idx] = remotecall_fetch(wpid, f,
map(L->L[idx], lsts)...)
end
end
end
Expand Down Expand Up @@ -1413,12 +1440,7 @@ function event_loop(isclient)
catch err
iserr, lasterr = true, err
bt = catch_backtrace()
if isa(err,DisconnectException)
# TODO: wake up tasks waiting for failed process
if !isclient
return
end
elseif isclient && isa(err,InterruptException)
if isclient && isa(err,InterruptException)
# root task is waiting for something on client. allow C-C
# to interrupt.
interrupt_waiting_task(roottask,err)
Expand Down
4 changes: 3 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ end

## condition variables

type ProcessExitedException <: Exception end

type Condition
waitq::Vector{Any}

Expand All @@ -89,7 +91,7 @@ function wait(c::Condition)
ct.runnable = false
args = yield(c)

if isa(args,InterruptException)
if isa(args,InterruptException) || isa(args,ProcessExitedException)
filter!(x->x!==ct, c.waitq)
error(args)
end
Expand Down

0 comments on commit f752fae

Please sign in to comment.