diff --git a/base/managers.jl b/base/managers.jl index 7f8131f56a2a8..db062af844f18 100644 --- a/base/managers.jl +++ b/base/managers.jl @@ -58,7 +58,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. launch_tasks = cell(length(manager.machines)) - for (i,(machine, cnt)) in enumerate(manager.machines) + for (i,(machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt launch_tasks[i] = @schedule try launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) @@ -85,30 +85,63 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched, exeflags = params[:exeflags] # machine could be of the format [user@]host[:port] bind_addr[:bind_port] + # machine format string is split on whitespace machine_bind = split(machine) + if isempty(machine_bind) + throw(ArgumentError("invalid machine definition format string: \"$machine\$")) + end if length(machine_bind) > 1 exeflags = `--bind-to $(machine_bind[2]) $exeflags` end exeflags = `$exeflags --worker` - machine_def = machine_bind[1] - machine_def = split(machine_def, ':') - portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : `` - sshflags = `$(params[:sshflags]) $portopt` - + machine_def = split(machine_bind[1], ':') + # if this machine def has a port number, add the port information to the ssh flags + if length(machine_def) > 2 + throw(ArgumentError("invalid machine defintion format string: invalid port format \"$machine_def\"")) + end host = machine_def[1] + portopt = `` + if length(machine_def) == 2 + portstr = machine_def[2] + if !isinteger(portstr) || (p = parse(Int,portstr); p < 1 || p > 65535) + msg = "invalid machine definition format string: invalid port format \"$machine_def\"" + throw(ArgumentError(msg)) + end + portopt = ` -p $(machine_def[2]) ` + end + sshflags = `$(params[:sshflags]) $portopt` # Build up the ssh command - tval = haskey(ENV, "JULIA_WORKER_TIMEOUT") ? `export JULIA_WORKER_TIMEOUT=$(ENV["JULIA_WORKER_TIMEOUT"]);` : `` - cmd = `cd $dir && $tval $exename $exeflags` # launch julia - cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under - cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch + # the default worker timeout + tval = haskey(ENV, "JULIA_WORKER_TIMEOUT") ? + `export JULIA_WORKER_TIMEOUT=$(ENV["JULIA_WORKER_TIMEOUT"]);` : `` + + # Julia process with passed in command line flag arguments + cmd = `cd $dir && $tval $exename $exeflags` - # launch + # shell login (-l) with string command (-c) to launch julia process + cmd = `sh -l -c $(shell_escape(cmd))` + + # remote launch with ssh with given ssh flags / host / port information + # -T → disable pseudo-terminal allocation + # -a → disable forwarding of auth agent connection + # -x → disable X11 forwarding + # -o ClearAllForwardings → option if forwarding connections and + # forwarded connections are causing collisions + # -n → Redirects stdin from /dev/null (actually, prevents reading from stdin). + # Used when running ssh in the background. + cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` + + # launch the remote Julia process + + # detach launches the command in a new process group, allowing it to outlive + # the initial julia process (Ctrl-C and teardown methods are handled through messages) + # for the launched porcesses. io, pobj = open(detach(cmd), "r") - wconfig = WorkerConfig() + wconfig = WorkerConfig() wconfig.io = io wconfig.host = host wconfig.tunnel = params[:tunnel] @@ -130,11 +163,11 @@ function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symb host = get(config.host) sshflags = get(config.sshflags) if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`) - println("Error sending a Ctrl-C to julia worker $id on $host") + warn(STDERR,"error sending a Ctrl-C to julia worker $id on $host") end else # This state can happen immediately after an addprocs - println("Worker $id cannot be presently interrupted.") + warn(STDERR,"worker $id cannot be presently interrupted.") end end end @@ -153,18 +186,33 @@ let tunnel_port = 9201 end -# establish an SSH tunnel to a remote worker -# returns P such that localhost:P connects to host:port +""" + ssh_tunnel(user, host, bind_addr, port, sshflags) -> localport + +Establish an SSH tunnel to a remote worker. +Returns a port number `localport` such that `localhost:localport` connects to `host:port`. +""" function ssh_tunnel(user, host, bind_addr, port, sshflags) - localp = next_tunnel_port() - ntries = cnt = 100 - while !success(detach(`ssh -T -a -x -o ExitOnForwardFailure=yes -f $sshflags $(user)@$host -L $localp:$bind_addr:$(Int(port)) sleep 60`)) && cnt > 0 - localp = next_tunnel_port() + port = Int(port) + cnt = 100 + localport = next_tunnel_port() + # if we cannot do port forwarding, bail immediately + # the connection is forwarded to `port` on the remote server over the local port `localp` + # the -f option backgrounds the ssh session + # `sleep 60` command specifies that an alloted time of 60 seconds is allowed to start the + # remote julia process and estabilish the network connections specified the the process topology. + # If no connections are made within 60 seconds, ssh will exit and an error will be printed on the + # process that launched the remote process. + ssh = `ssh -T -a -x -o ExitOnForwardFailure=yes` + while !success(detach(`$ssh -f $sshflags $user@$host -L $localp:$bind_addr:$port sleep 60`)) && cnt > 0 + localport = next_tunnel_port() cnt -= 1 end - (cnt == 0) && error("Unable to create SSH tunnel after $cnt tries. No free port?") - - localp + if cnt == 0 + throw(ErrorException( + "unable to create SSH tunnel after $cnt tries. No free port?")) + end + return localport end @@ -242,7 +290,8 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) if haskey(ENV, "USER") user = ENV["USER"] elseif tunnel - error("USER must be specified either in the environment or as part of the hostname when tunnel option is used") + error("USER must be specified either in the environment ", + "or as part of the hostname when tunnel option is used") end end