Skip to content

Commit

Permalink
Merge pull request JuliaLang#13694 from JuliaLang/jcb/sshmancleanup
Browse files Browse the repository at this point in the history
Add more docs about what is going on in SSHManager
  • Loading branch information
jakebolewski committed Oct 21, 2015
2 parents 520a3fe + 6edd583 commit e736de2
Showing 1 changed file with 73 additions and 24 deletions.
97 changes: 73 additions & 24 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
launch_tasks = cell(length(manager.machines))

for (i,(machine, cnt)) in enumerate(manager.machines)
for (i,(machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @schedule try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
Expand All @@ -85,30 +85,63 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
exeflags = params[:exeflags]

# machine could be of the format [user@]host[:port] bind_addr[:bind_port]
# machine format string is split on whitespace
machine_bind = split(machine)
if isempty(machine_bind)
throw(ArgumentError("invalid machine definition format string: \"$machine\$"))
end
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
end
exeflags = `$exeflags --worker`

machine_def = machine_bind[1]
machine_def = split(machine_def, ':')
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : ``
sshflags = `$(params[:sshflags]) $portopt`

machine_def = split(machine_bind[1], ':')
# if this machine def has a port number, add the port information to the ssh flags
if length(machine_def) > 2
throw(ArgumentError("invalid machine defintion format string: invalid port format \"$machine_def\""))
end
host = machine_def[1]
portopt = ``
if length(machine_def) == 2
portstr = machine_def[2]
if !isinteger(portstr) || (p = parse(Int,portstr); p < 1 || p > 65535)
msg = "invalid machine definition format string: invalid port format \"$machine_def\""
throw(ArgumentError(msg))
end
portopt = ` -p $(machine_def[2]) `
end
sshflags = `$(params[:sshflags]) $portopt`

# Build up the ssh command
tval = haskey(ENV, "JULIA_WORKER_TIMEOUT") ? `export JULIA_WORKER_TIMEOUT=$(ENV["JULIA_WORKER_TIMEOUT"]);` : ``

cmd = `cd $dir && $tval $exename $exeflags` # launch julia
cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under
cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch
# the default worker timeout
tval = haskey(ENV, "JULIA_WORKER_TIMEOUT") ?
`export JULIA_WORKER_TIMEOUT=$(ENV["JULIA_WORKER_TIMEOUT"]);` : ``

# Julia process with passed in command line flag arguments
cmd = `cd $dir && $tval $exename $exeflags`

# launch
# shell login (-l) with string command (-c) to launch julia process
cmd = `sh -l -c $(shell_escape(cmd))`

# remote launch with ssh with given ssh flags / host / port information
# -T → disable pseudo-terminal allocation
# -a → disable forwarding of auth agent connection
# -x → disable X11 forwarding
# -o ClearAllForwardings → option if forwarding connections and
# forwarded connections are causing collisions
# -n → Redirects stdin from /dev/null (actually, prevents reading from stdin).
# Used when running ssh in the background.
cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))`

# launch the remote Julia process

# detach launches the command in a new process group, allowing it to outlive
# the initial julia process (Ctrl-C and teardown methods are handled through messages)
# for the launched porcesses.
io, pobj = open(detach(cmd), "r")
wconfig = WorkerConfig()

wconfig = WorkerConfig()
wconfig.io = io
wconfig.host = host
wconfig.tunnel = params[:tunnel]
Expand All @@ -130,11 +163,11 @@ function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symb
host = get(config.host)
sshflags = get(config.sshflags)
if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`)
println("Error sending a Ctrl-C to julia worker $id on $host")
warn(STDERR,"error sending a Ctrl-C to julia worker $id on $host")
end
else
# This state can happen immediately after an addprocs
println("Worker $id cannot be presently interrupted.")
warn(STDERR,"worker $id cannot be presently interrupted.")
end
end
end
Expand All @@ -153,18 +186,33 @@ let tunnel_port = 9201
end


# establish an SSH tunnel to a remote worker
# returns P such that localhost:P connects to host:port
"""
ssh_tunnel(user, host, bind_addr, port, sshflags) -> localport
Establish an SSH tunnel to a remote worker.
Returns a port number `localport` such that `localhost:localport` connects to `host:port`.
"""
function ssh_tunnel(user, host, bind_addr, port, sshflags)
localp = next_tunnel_port()
ntries = cnt = 100
while !success(detach(`ssh -T -a -x -o ExitOnForwardFailure=yes -f $sshflags $(user)@$host -L $localp:$bind_addr:$(Int(port)) sleep 60`)) && cnt > 0
localp = next_tunnel_port()
port = Int(port)
cnt = 100
localport = next_tunnel_port()
# if we cannot do port forwarding, bail immediately
# the connection is forwarded to `port` on the remote server over the local port `localp`
# the -f option backgrounds the ssh session
# `sleep 60` command specifies that an alloted time of 60 seconds is allowed to start the
# remote julia process and estabilish the network connections specified the the process topology.
# If no connections are made within 60 seconds, ssh will exit and an error will be printed on the
# process that launched the remote process.
ssh = `ssh -T -a -x -o ExitOnForwardFailure=yes`
while !success(detach(`$ssh -f $sshflags $user@$host -L $localp:$bind_addr:$port sleep 60`)) && cnt > 0
localport = next_tunnel_port()
cnt -= 1
end
(cnt == 0) && error("Unable to create SSH tunnel after $cnt tries. No free port?")

localp
if cnt == 0
throw(ErrorException(
"unable to create SSH tunnel after $cnt tries. No free port?"))
end
return localport
end


Expand Down Expand Up @@ -242,7 +290,8 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if haskey(ENV, "USER")
user = ENV["USER"]
elseif tunnel
error("USER must be specified either in the environment or as part of the hostname when tunnel option is used")
error("USER must be specified either in the environment ",
"or as part of the hostname when tunnel option is used")
end
end

Expand Down

0 comments on commit e736de2

Please sign in to comment.