Skip to content

Commit

Permalink
add connection timeout and continue hosts of failure
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed May 1, 2023
1 parent 0623ca0 commit 04b0fe4
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 45 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ SimploTask supports the following command-line options:
If not specified all the tasks will be executed.
- `-d`, `--target=`: Specifies the target name to use for the task execution. The target should be defined in the playbook file and can represent remote hosts, inventory files, or inventory URLs. If not specified the `default` target will be used. User can pass a host name or IP instead of the target name for a quick override. Providing the `-d`, `--target` flag multiple times with different targets sets multiple destination targets or multiple hosts, e.g., `-d prod -d dev` or `-d example1.com -d example2.com`.
- `-c`, `--concurrent=`: Sets the number of concurrent hosts to execute tasks. Defaults to `1`, which means hosts will be handled sequentially.
- `ssh-timeout`: Sets the SSH timeout. Defaults to `30s`.
- `-f`, `--filter=`: Filter destinations for the specified target. Providing the `-f` flag multiple times with different name, or hosts names or ips/fqdns allow multiple destination hosts from the selected target, e.g., `-f apollo -f h2.example2.com`
- `--inventory-file=`: Specifies the inventory file to use for the task execution. Overrides the inventory file defined in the
playbook file.
Expand Down
8 changes: 5 additions & 3 deletions app/executor/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"net"
"os"
"strings"
"time"

"golang.org/x/crypto/ssh"
)

// Connector provides factory methods to create Remote executor. Each executor is connected to a single SSH hostAddr.
type Connector struct {
privateKey string
timeout time.Duration
}

// NewConnector creates a new Connector for a given user and private key.
func NewConnector(privateKey string) (res *Connector, err error) {
res = &Connector{privateKey: privateKey}
func NewConnector(privateKey string, timeout time.Duration) (res *Connector, err error) {
res = &Connector{privateKey: privateKey, timeout: timeout}
if _, err := os.Stat(privateKey); os.IsNotExist(err) {
return nil, fmt.Errorf("private key file %q does not exist", privateKey)
}
Expand All @@ -42,7 +44,7 @@ func (c *Connector) sshClient(ctx context.Context, host, user string) (session *
host += ":22"
}

dialer := net.Dialer{}
dialer := net.Dialer{Timeout: c.timeout}
conn, err := dialer.DialContext(ctx, "tcp", host)
if err != nil {
return nil, fmt.Errorf("failed to dial: %w", err)
Expand Down
9 changes: 5 additions & 4 deletions app/executor/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executor
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -13,27 +14,27 @@ func TestConnector_Connect(t *testing.T) {
defer teardown()

t.Run("good connection", func(t *testing.T) {
c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
defer sess.Close()
})

t.Run("bad user", func(t *testing.T) {
c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
_, err = c.Connect(ctx, hostAndPort, "h1", "test33")
require.ErrorContains(t, err, "ssh: unable to authenticate")
})

t.Run("bad key", func(t *testing.T) {
_, err := NewConnector("testdata/test_ssh_key33")
_, err := NewConnector("testdata/test_ssh_key33", time.Second*10)
require.ErrorContains(t, err, "private key file \"testdata/test_ssh_key33\" does not exist", "test")
})

t.Run("wrong port", func(t *testing.T) {
c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
_, err = c.Connect(ctx, "127.0.0.1:12345", "h1", "test")
require.ErrorContains(t, err, "failed to dial: dial tcp 127.0.0.1:12345")
Expand Down
18 changes: 9 additions & 9 deletions app/executor/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestExecuter_UploadAndDownload(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)

sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestExecuter_Upload_FailedNoRemoteDir(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand All @@ -65,7 +65,7 @@ func TestExecuter_Upload_CantMakeRemoteDir(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand All @@ -81,7 +81,7 @@ func TestExecuter_Upload_Canceled(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand All @@ -98,7 +98,7 @@ func TestExecuter_UploadCanceledWithoutMkdir(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand All @@ -116,7 +116,7 @@ func TestExecuter_ConnectCanceled(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
_, err = c.Connect(ctx, hostAndPort, "h1", "test")
assert.ErrorContains(t, err, "failed to dial: dial tcp: lookup localhost: i/o timeout")
Expand All @@ -128,7 +128,7 @@ func TestExecuter_Run(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestExecuter_Sync(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestExecuter_Delete(t *testing.T) {
hostAndPort, teardown := startTestContainer(t)
defer teardown()

c, err := NewConnector("testdata/test_ssh_key")
c, err := NewConnector("testdata/test_ssh_key", time.Second*10)
require.NoError(t, err)
sess, err := c.Connect(ctx, hostAndPort, "h1", "test")
require.NoError(t, err)
Expand Down
19 changes: 11 additions & 8 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/fatih/color"
"github.com/go-pkgz/lgr"
"github.com/hashicorp/go-multierror"
"github.com/jessevdk/go-flags"

"github.com/umputun/simplotask/app/config"
Expand All @@ -23,10 +24,11 @@ import (
)

type options struct {
PlaybookFile string `short:"p" long:"file" env:"SPOT_FILE" description:"playbook file" default:"spot.yml"`
TaskName string `short:"t" long:"task" description:"task name"`
Targets []string `short:"d" long:"target" description:"target name" default:"default"`
Concurrent int `short:"c" long:"concurrent" description:"concurrent tasks" default:"1"`
PlaybookFile string `short:"p" long:"file" env:"SPOT_FILE" description:"playbook file" default:"spot.yml"`
TaskName string `short:"t" long:"task" description:"task name"`
Targets []string `short:"d" long:"target" description:"target name" default:"default"`
Concurrent int `short:"c" long:"concurrent" description:"concurrent tasks" default:"1"`
SSHTimeout time.Duration `long:"ssh-timeout" description:"ssh timeout" default:"30s"`

// target overrides
Filter []string `short:"f" long:"filter" description:"filter target hosts"`
Expand Down Expand Up @@ -72,7 +74,7 @@ func main() {
if opts.Dbg {
log.Panicf("[ERROR] %v", err)
}
fmt.Printf("failed: %v\n", err)
fmt.Printf("failed, %v", err)
os.Exit(1)
}
}
Expand Down Expand Up @@ -102,7 +104,7 @@ func run(opts options) error {
}
}

connector, err := executor.NewConnector(sshKey(opts, conf))
connector, err := executor.NewConnector(sshKey(opts, conf), opts.SSHTimeout)
if err != nil {
return fmt.Errorf("can't create connector: %w", err)
}
Expand All @@ -116,14 +118,15 @@ func run(opts options) error {
Verbose: opts.Verbose,
}

errs := new(multierror.Error)
if opts.AdHocCmd != "" { // run ad-hoc command
r.Verbose = true // always verbose for ad-hoc
for _, targetName := range opts.Targets {
if err := runTaskForTarget(ctx, r, "ad-hoc", targetName); err != nil {
return err
errs = multierror.Append(errs, err)
}
}
return nil
return errs.ErrorOrNil()
}

if opts.TaskName != "" { // run single task
Expand Down
2 changes: 1 addition & 1 deletion app/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func Test_runFailed(t *testing.T) {
}
setupLog(true)
err := run(opts)
assert.ErrorContains(t, err, `can't run command "show content"`)
assert.ErrorContains(t, err, `failed command "show content"`)
}

func Test_runNoConfig(t *testing.T) {
Expand Down
15 changes: 10 additions & 5 deletions app/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Process struct {
Only []string
}

// Connector is an interface for connecting to a hostAddr, and returning remote executer.
// Connector is an interface for connecting to a host, and returning remote executer.
type Connector interface {
Connect(ctx context.Context, hostAddr, hostName, user string) (*executor.Remote, error)
}
Expand All @@ -45,7 +45,8 @@ type ProcStats struct {
Hosts int
}

// Run runs a task for a set of target hosts. Runs in parallel with limited concurrency, each hostAddr is processed in separate goroutine.
// Run runs a task for a set of target hosts. Runs in parallel with limited concurrency,
// each host is processed in separate goroutine.
func (p *Process) Run(ctx context.Context, task, target string) (s ProcStats, err error) {
tsk, err := p.Config.Task(task)
if err != nil {
Expand All @@ -68,6 +69,10 @@ func (p *Process) Run(ctx context.Context, task, target string) (s ProcStats, er
if i == 0 {
atomic.AddInt32(&commands, int32(count))
}
if e != nil {
_, errLog := executor.MakeOutAndErrWriters(fmt.Sprintf("%s:%d", host.Host, host.Port), host.Name, p.Verbose)
errLog.Write([]byte(e.Error())) //nolint
}
return e
})
}
Expand All @@ -93,7 +98,7 @@ func (p *Process) Run(ctx context.Context, task, target string) (s ProcStats, er
return ProcStats{Hosts: len(targetHosts), Commands: int(atomic.LoadInt32(&commands))}, err
}

// runTaskOnHost executes all commands of a task on a target hostAddr. hostAddr can be a remote hostAddr or localhost with port.
// runTaskOnHost executes all commands of a task on a target host. hostAddr can be a remote host or localhost with port.
func (p *Process) runTaskOnHost(ctx context.Context, tsk *config.Task, hostAddr, hostName, user string) (int, error) {
contains := func(list []string, s string) bool {
for _, v := range list {
Expand Down Expand Up @@ -124,7 +129,7 @@ func (p *Process) runTaskOnHost(ctx context.Context, tsk *config.Task, hostAddr,
continue
}

log.Printf("[INFO] run command %q on hostAddr %s (%s)", cmd.Name, hostAddr, hostName)
log.Printf("[INFO] run command %q on host %q (%s)", cmd.Name, hostAddr, hostName)
st := time.Now()
params := execCmdParams{cmd: cmd, hostAddr: hostAddr, tsk: tsk, exec: remote}
if cmd.Options.Local {
Expand All @@ -134,7 +139,7 @@ func (p *Process) runTaskOnHost(ctx context.Context, tsk *config.Task, hostAddr,
details, err := p.execCommand(ctx, params)
if err != nil {
if !cmd.Options.IgnoreErrors {
return count, fmt.Errorf("can't run command %q on hostAddr %s (%s): %w", cmd.Name, hostAddr, hostName, err)
return count, fmt.Errorf("failed command %q on host %s (%s): %w", cmd.Name, hostAddr, hostName, err)
}

fmt.Fprintf(p.ColorWriter.WithHost(hostAddr, hostName), "failed %s%s (%v)",
Expand Down
Loading

0 comments on commit 04b0fe4

Please sign in to comment.