diff --git a/cli/cli.go b/cli/cli.go index bc38e601..18739423 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -4,6 +4,7 @@ import ( "os" "github.com/runabol/tork/internal/logging" + "github.com/runabol/tork/internal/reexec" ucli "github.com/urfave/cli/v2" ) @@ -25,6 +26,9 @@ func New() *CLI { } func (c *CLI) Run() error { + if reexec.Init() { + return nil + } return c.app.Run(os.Args) } diff --git a/conf/conf.go b/conf/conf.go index 56228372..30a9c382 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -55,7 +55,7 @@ func LoadConfig() error { if userConfig != "" { return errors.Errorf(errMsg) } else { - logger.Warn().Msg(errMsg) + logger.Debug().Msg(errMsg) } return nil } diff --git a/configs/sample.config.toml b/configs/sample.config.toml index 5b14ac00..02d7c796 100644 --- a/configs/sample.config.toml +++ b/configs/sample.config.toml @@ -83,3 +83,11 @@ denylist = [] # supports wildcards (*) [mounts.temp] dir = "/tmp" + +[runtime] +type = "docker" # docker | shell + +[runtime.shell] +cmd = ["bash", "-c"] +uid = "1000" # requires running the Tork process as root +gid = "1000" # requires running the Tork process as root diff --git a/engine/worker.go b/engine/worker.go index 0c02979c..b895ccd4 100644 --- a/engine/worker.go +++ b/engine/worker.go @@ -4,12 +4,15 @@ import ( "github.com/pkg/errors" "github.com/runabol/tork/conf" "github.com/runabol/tork/internal/runtime" + "github.com/runabol/tork/internal/runtime/docker" + "github.com/runabol/tork/internal/runtime/shell" "github.com/runabol/tork/internal/worker" "github.com/runabol/tork/mount" ) func (e *Engine) initWorker() error { - rt, err := runtime.NewDockerRuntime() + // init the runtime + rt, err := initRuntime() if err != nil { return err } @@ -50,3 +53,19 @@ func (e *Engine) initWorker() error { e.worker = w return nil } + +func initRuntime() (runtime.Runtime, error) { + runtimeType := conf.StringDefault("runtime.type", "docker") + switch runtimeType { + case "docker": + return docker.NewDockerRuntime() + case "shell": + return shell.NewShellRuntime(shell.Config{ + CMD: conf.Strings("runtime.shell.cmd"), + UID: conf.StringDefault("runtime.shell.uid", shell.DEFAULT_UID), + GID: conf.StringDefault("runtime.shell.gid", shell.DEFAULT_GID), + }), nil + default: + return nil, errors.Errorf("unknown runtime type: %s", runtimeType) + } +} diff --git a/go.mod b/go.mod index 4ad28e7a..a73ba3dd 100644 --- a/go.mod +++ b/go.mod @@ -25,8 +25,10 @@ require ( github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.7 golang.org/x/exp v0.0.0-20230807204917-050eac23e9de + golang.org/x/sys v0.12.0 golang.org/x/time v0.3.0 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.1 ) require ( @@ -42,6 +44,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/google/go-cmp v0.5.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect @@ -69,8 +72,6 @@ require ( golang.org/x/crypto v0.11.0 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.12.0 // indirect - golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/tools v0.6.0 // indirect - gotest.tools/v3 v3.5.1 // indirect ) diff --git a/input/validate.go b/input/validate.go index a5130583..573bcfdd 100644 --- a/input/validate.go +++ b/input/validate.go @@ -83,7 +83,6 @@ func validateQueue(fl validator.FieldLevel) bool { func taskInputValidation(sl validator.StructLevel) { taskTypeValidation(sl) - regularTaskValidation(sl) compositeTaskValidation(sl) } @@ -149,13 +148,3 @@ func compositeTaskValidation(sl validator.StructLevel) { sl.ReportError(t.Timeout, "timeout", "Timeout", "invalidcompositetask", "") } } - -func regularTaskValidation(sl validator.StructLevel) { - t := sl.Current().Interface().(Task) - if t.Parallel != nil || t.Each != nil || t.SubJob != nil { - return - } - if t.Image == "" { - sl.ReportError(t.Image, "image", "Image", "required", "") - } -} diff --git a/input/validate_test.go b/input/validate_test.go index e9a7cd85..25d83b50 100644 --- a/input/validate_test.go +++ b/input/validate_test.go @@ -118,6 +118,19 @@ func TestValidateJobTaskNoName(t *testing.T) { assert.Error(t, err) } +func TestValidateJobTaskNoImage(t *testing.T) { + j := Job{ + Name: "test job", + Tasks: []Task{ + { + Name: "some task", + }, + }, + } + err := j.Validate() + assert.NoError(t, err) +} + func TestValidateJobTaskRetry(t *testing.T) { j := Job{ Name: "test job", diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7e973bed..d065d5e9 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -113,6 +113,7 @@ func (c *Cache[V]) Modify(k string, m func(x V) (V, error)) error { v := item.Object v, err := m(v) if err != nil { + item.mu.Unlock() return err } item.Object = v diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index 97c3e5fd..66890df1 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -255,6 +255,10 @@ func TestModify(t *testing.T) { return 0, errors.New("something bad happened") }) assert.Error(t, err) + err = tc.Modify("number", func(x int) (int, error) { + return 17, nil + }) + assert.NoError(t, err) } func TestModifyObjectConcurrently(t *testing.T) { diff --git a/internal/coordinator/coordinator_test.go b/internal/coordinator/coordinator_test.go index 1dd047ef..00887884 100644 --- a/internal/coordinator/coordinator_test.go +++ b/internal/coordinator/coordinator_test.go @@ -15,8 +15,7 @@ import ( "github.com/runabol/tork/mount" "github.com/runabol/tork/mq" - "github.com/runabol/tork/internal/runtime" - + "github.com/runabol/tork/internal/runtime/docker" "github.com/runabol/tork/internal/uuid" "github.com/runabol/tork/internal/worker" "github.com/stretchr/testify/assert" @@ -305,7 +304,7 @@ func doRunJob(t *testing.T, filename string) *tork.Job { assert.NoError(t, c.Stop()) }() - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) mounter, err := mount.NewVolumeMounter() diff --git a/internal/reexec/command_linux.go b/internal/reexec/command_linux.go new file mode 100644 index 00000000..8590eb86 --- /dev/null +++ b/internal/reexec/command_linux.go @@ -0,0 +1,35 @@ +//go:build linux + +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "os/exec" + "syscall" + + "golang.org/x/sys/unix" +) + +// Self returns the path to the current process's binary. +// Returns "/proc/self/exe". +func Self() string { + return "/proc/self/exe" +} + +// Command returns *exec.Cmd which has Path as current binary. Also it setting +// SysProcAttr.Pdeathsig to SIGTERM. +// This will use the in-memory version (/proc/self/exe) of the current binary, +// it is thus safe to delete or replace the on-disk binary (os.Args[0]). +// +// As SysProcAttr.Pdeathsig is set, the signal will be sent to the process when +// the OS thread which created the process dies. It is the caller's +// responsibility to ensure that the creating thread is not terminated +// prematurely. See https://go.dev/issue/27505 for more details. +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + SysProcAttr: &syscall.SysProcAttr{ + Pdeathsig: unix.SIGTERM, + }, + } +} diff --git a/internal/reexec/command_unix.go b/internal/reexec/command_unix.go new file mode 100644 index 00000000..0df5195e --- /dev/null +++ b/internal/reexec/command_unix.go @@ -0,0 +1,23 @@ +//go:build freebsd || darwin + +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker" at "/usr/bin/", then cmd.Path will +// be set to "/usr/bin/docker". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/internal/reexec/command_unsupported.go b/internal/reexec/command_unsupported.go new file mode 100644 index 00000000..7175853a --- /dev/null +++ b/internal/reexec/command_unsupported.go @@ -0,0 +1,17 @@ +//go:build !linux && !windows && !freebsd && !darwin +// +build !linux,!windows,!freebsd,!darwin + +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "os/exec" +) + +func Self() string { + return "" +} + +// Command is unsupported on operating systems apart from Linux, Windows, and Darwin. +func Command(args ...string) *exec.Cmd { + return nil +} diff --git a/internal/reexec/command_windows.go b/internal/reexec/command_windows.go new file mode 100644 index 00000000..43822689 --- /dev/null +++ b/internal/reexec/command_windows.go @@ -0,0 +1,21 @@ +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker.exe" at "C:\", then cmd.Path will +// be set to "C:\docker.exe". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/internal/reexec/reexec_test.go b/internal/reexec/reexec_test.go new file mode 100644 index 00000000..8aea0431 --- /dev/null +++ b/internal/reexec/reexec_test.go @@ -0,0 +1,52 @@ +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "os" + "os/exec" + "testing" + + "gotest.tools/v3/assert" +) + +func init() { + Register("reexec", func() { + panic("Return Error") + }) + Init() +} + +func TestRegister(t *testing.T) { + defer func() { + if r := recover(); r != nil { + assert.Equal(t, `reexec func already registered under name "reexec"`, r) + } + }() + Register("reexec", func() {}) +} + +func TestCommand(t *testing.T) { + cmd := Command("reexec") + w, err := cmd.StdinPipe() + assert.NilError(t, err, "Error on pipe creation: %v", err) + defer w.Close() + + err = cmd.Start() + assert.NilError(t, err, "Error on re-exec cmd: %v", err) + err = cmd.Wait() + assert.Error(t, err, "exit status 2") +} + +func TestNaiveSelf(t *testing.T) { + if os.Getenv("TEST_CHECK") == "1" { + os.Exit(2) + } + cmd := exec.Command(naiveSelf(), "-test.run=TestNaiveSelf") + cmd.Env = append(os.Environ(), "TEST_CHECK=1") + err := cmd.Start() + assert.NilError(t, err, "Unable to start command") + err = cmd.Wait() + assert.Error(t, err, "exit status 2") + + os.Args[0] = "mkdir" + assert.Check(t, naiveSelf() != os.Args[0]) +} diff --git a/internal/reexec/rexec.go b/internal/reexec/rexec.go new file mode 100644 index 00000000..54e934c2 --- /dev/null +++ b/internal/reexec/rexec.go @@ -0,0 +1,51 @@ +// Package reexec facilitates the busybox style reexec of the docker binary that +// we require because of the forking limitations of using Go. Handlers can be +// registered with a name and the argv 0 of the exec of the binary will be used +// to find and execute custom init paths. +package reexec // import "github.com/docker/docker/pkg/reexec" + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" +) + +var registeredInitializers = make(map[string]func()) + +// Register adds an initialization func under the specified name +func Register(name string, initializer func()) { + if _, exists := registeredInitializers[name]; exists { + panic(fmt.Sprintf("reexec func already registered under name %q", name)) + } + + registeredInitializers[name] = initializer +} + +// Init is called as the first part of the exec process and returns true if an +// initialization function was called. +func Init() bool { + initializer, exists := registeredInitializers[os.Args[0]] + if exists { + initializer() + + return true + } + return false +} + +func naiveSelf() string { + name := os.Args[0] + if filepath.Base(name) == name { + if lp, err := exec.LookPath(name); err == nil { + return lp + } + } + // handle conversion of relative paths to absolute + if absName, err := filepath.Abs(name); err == nil { + return absName + } + // if we couldn't get absolute name, return original + // (NOTE: Go only errors on Abs() if os.Getwd fails) + return name +} diff --git a/internal/runtime/docker.go b/internal/runtime/docker/docker.go similarity index 99% rename from internal/runtime/docker.go rename to internal/runtime/docker/docker.go index 3566a9e6..05176b5f 100644 --- a/internal/runtime/docker.go +++ b/internal/runtime/docker/docker.go @@ -1,4 +1,4 @@ -package runtime +package docker import ( "archive/tar" diff --git a/internal/runtime/docker_test.go b/internal/runtime/docker/docker_test.go similarity index 99% rename from internal/runtime/docker_test.go rename to internal/runtime/docker/docker_test.go index d3fff78c..31f4c466 100644 --- a/internal/runtime/docker_test.go +++ b/internal/runtime/docker/docker_test.go @@ -1,4 +1,4 @@ -package runtime +package docker import ( "context" diff --git a/internal/runtime/shell/shell.go b/internal/runtime/shell/shell.go new file mode 100644 index 00000000..e185dac5 --- /dev/null +++ b/internal/runtime/shell/shell.go @@ -0,0 +1,251 @@ +package shell + +import ( + "bufio" + "context" + "flag" + "strconv" + "strings" + "syscall" + + "fmt" + "os" + "os/exec" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/runabol/tork" + "github.com/runabol/tork/internal/reexec" + "github.com/runabol/tork/internal/syncx" +) + +type Rexec func(args ...string) *exec.Cmd + +const ( + DEFAULT_UID = "-" + DEFAULT_GID = "-" + envVarPrefix = "REEXEC_" +) + +func init() { + reexec.Register("shell", reexecRun) +} + +type ShellRuntime struct { + cmds *syncx.Map[string, *exec.Cmd] + shell []string + uid string + gid string + reexec Rexec +} + +type Config struct { + CMD []string + UID string + GID string + Rexec Rexec +} + +func NewShellRuntime(cfg Config) *ShellRuntime { + if len(cfg.CMD) == 0 { + cfg.CMD = []string{"bash", "-c"} + } + if cfg.Rexec == nil { + cfg.Rexec = reexec.Command + } + return &ShellRuntime{ + cmds: new(syncx.Map[string, *exec.Cmd]), + shell: cfg.CMD, + uid: cfg.UID, + gid: cfg.GID, + reexec: cfg.Rexec, + } +} + +func (r *ShellRuntime) Run(ctx context.Context, t *tork.Task) error { + if t.ID == "" { + return errors.New("task id is required") + } + if len(t.Mounts) > 0 { + return errors.New("mounts are not supported on shell runtime") + } + if len(t.Entrypoint) > 0 { + return errors.New("entrypoint is not supported on shell runtime") + } + if t.Image != "" { + return errors.New("image is not supported on shell runtime") + } + if t.Limits != nil && (t.Limits.CPUs != "" || t.Limits.Memory != "") { + return errors.New("limits are not supported on shell runtime") + } + if len(t.Networks) > 0 { + return errors.New("networks are not supported on shell runtime") + } + if t.Registry != nil { + return errors.New("registry is not supported on shell runtime") + } + if len(t.CMD) > 0 { + return errors.New("cmd is not supported on shell runtime") + } + defer r.cmds.Delete(t.ID) + + workdir, err := os.MkdirTemp("", "tork") + if err != nil { + return err + } + defer os.RemoveAll(workdir) + + log.Debug().Msgf("Created workdir %s", workdir) + + if err := os.WriteFile(fmt.Sprintf("%s/stdout", workdir), []byte{}, 0606); err != nil { + return errors.Wrapf(err, "error writing the entrypoint") + } + + for filename, contents := range t.Files { + filename = fmt.Sprintf("%s/%s", workdir, filename) + if err := os.WriteFile(filename, []byte(contents), 0444); err != nil { + return errors.Wrapf(err, "error writing file: %s", filename) + } + } + + env := []string{} + for name, value := range t.Env { + env = append(env, fmt.Sprintf("%s%s=%s", envVarPrefix, name, value)) + } + env = append(env, fmt.Sprintf("%sTORK_OUTPUT=%s/stdout", envVarPrefix, workdir)) + env = append(env, fmt.Sprintf("WORKDIR=%s", workdir)) + env = append(env, fmt.Sprintf("PATH=%s", os.Getenv("PATH"))) + + if err := os.WriteFile(fmt.Sprintf("%s/entrypoint", workdir), []byte(t.Run), 0555); err != nil { + return errors.Wrapf(err, "error writing the entrypoint") + } + args := append(r.shell, fmt.Sprintf("%s/entrypoint", workdir)) + args = append([]string{"shell", "-uid", r.uid, "-gid", r.gid}, args...) + cmd := r.reexec(args...) + cmd.Env = env + cmd.Dir = workdir + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + defer stdout.Close() + cmd.Stderr = cmd.Stdout + + if err := cmd.Start(); err != nil { + return err + } + + r.cmds.Set(t.ID, cmd) + + go func() { + reader := bufio.NewReader(stdout) + line, err := reader.ReadString('\n') + for err == nil { + fmt.Println(line) + line, err = reader.ReadString('\n') + } + }() + + errChan := make(chan error) + doneChan := make(chan any) + go func() { + if err := cmd.Wait(); err != nil { + errChan <- err + return + } + close(doneChan) + }() + select { + case err := <-errChan: + return errors.Wrapf(err, "error executing command") + case <-ctx.Done(): + if err := cmd.Process.Kill(); err != nil { + return errors.Wrapf(err, "error cancelling command") + } + return ctx.Err() + case <-doneChan: + } + + output, err := os.ReadFile(fmt.Sprintf("%s/stdout", workdir)) + if err != nil { + return errors.Wrapf(err, "error reading the task output") + } + + t.Result = string(output) + + return nil +} + +func reexecRun() { + var uid string + var gid string + flag.StringVar(&uid, "uid", "", "the uid to use when running the process") + flag.StringVar(&gid, "gid", "", "the gid to use when running the process") + flag.Parse() + + // set UID + if uid != DEFAULT_UID { + uidi, err := strconv.Atoi(uid) + if err != nil { + log.Fatal().Err(err).Msgf("invalid uid: %s", uid) + } + if err := syscall.Setuid(uidi); err != nil { + log.Fatal().Err(err).Msgf("error setting uid: %s", uid) + } + } + + // set GID + if uid != DEFAULT_GID { + gidi, err := strconv.Atoi(gid) + if err != nil { + log.Fatal().Err(err).Msgf("invalid gid: %s", gid) + } + if err := syscall.Setgid(gidi); err != nil { + log.Fatal().Err(err).Msgf("error setting gid: %s", gid) + } + } + + workdir := os.Getenv("WORKDIR") + if workdir == "" { + log.Fatal().Msg("work dir not set") + } + + env := []string{} + for _, entry := range os.Environ() { + kv := strings.Split(entry, "=") + if len(kv) != 2 { + log.Fatal().Msgf("invalid env var: %s", entry) + } + if strings.HasPrefix(kv[0], envVarPrefix) { + k := strings.TrimPrefix(kv[0], envVarPrefix) + v := kv[1] + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + } + + cmd := exec.Command(flag.Args()[0], flag.Args()[1:]...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = env + cmd.Dir = workdir + + log.Debug().Msgf("reexecing: %s as %s:%s", strings.Join(flag.Args(), " "), uid, gid) + if err := cmd.Run(); err != nil { + log.Fatal().Err(err).Msgf("error reexecing: %s", strings.Join(flag.Args(), " ")) + } +} + +func (r *ShellRuntime) Stop(ctx context.Context, t *tork.Task) error { + proc, ok := r.cmds.Get(t.ID) + if !ok { + return nil + } + if err := proc.Process.Kill(); err != nil { + return errors.Wrapf(err, "error stopping process for task: %s", t.ID) + } + return nil +} + +func (r *ShellRuntime) HealthCheck(ctx context.Context) error { + return nil +} diff --git a/internal/runtime/shell/shell_test.go b/internal/runtime/shell/shell_test.go new file mode 100644 index 00000000..9d495f97 --- /dev/null +++ b/internal/runtime/shell/shell_test.go @@ -0,0 +1,144 @@ +package shell + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/runabol/tork" + "github.com/runabol/tork/internal/uuid" + "github.com/stretchr/testify/assert" +) + +func TestShellRuntimeRunResult(t *testing.T) { + rt := NewShellRuntime(Config{ + UID: DEFAULT_UID, + GID: DEFAULT_GID, + Rexec: func(args ...string) *exec.Cmd { + cmd := exec.Command(args[5], args[6:]...) + return cmd + }, + }) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "echo -n hello world > $REEXEC_TORK_OUTPUT", + } + + err := rt.Run(context.Background(), tk) + + assert.NoError(t, err) + assert.Equal(t, "hello world", tk.Result) +} + +func TestShellRuntimeRunFile(t *testing.T) { + rt := NewShellRuntime(Config{ + UID: DEFAULT_UID, + GID: DEFAULT_GID, + Rexec: func(args ...string) *exec.Cmd { + cmd := exec.Command(args[5], args[6:]...) + return cmd + }, + }) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "cat hello.txt > $REEXEC_TORK_OUTPUT", + Files: map[string]string{ + "hello.txt": "hello world", + }, + } + + err := rt.Run(context.Background(), tk) + + assert.NoError(t, err) + assert.Equal(t, "hello world", tk.Result) +} + +func TestShellRuntimeRunNotSupported(t *testing.T) { + rt := NewShellRuntime(Config{}) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "echo hello world", + Networks: []string{"some-network"}, + } + + err := rt.Run(context.Background(), tk) + + assert.Error(t, err) +} + +func TestShellRuntimeRunError(t *testing.T) { + rt := NewShellRuntime(Config{ + UID: DEFAULT_UID, + GID: DEFAULT_GID, + Rexec: func(args ...string) *exec.Cmd { + cmd := exec.Command(args[5], args[6:]...) + return cmd + }, + }) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "no_such_command", + } + + err := rt.Run(context.Background(), tk) + + assert.Error(t, err) +} + +func TestShellRuntimeRunTimeout(t *testing.T) { + rt := NewShellRuntime(Config{ + UID: DEFAULT_UID, + GID: DEFAULT_GID, + Rexec: func(args ...string) *exec.Cmd { + cmd := exec.Command(args[5], args[6:]...) + return cmd + }, + }) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "sleep 30", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + + err := rt.Run(ctx, tk) + + assert.Error(t, err) +} + +func TestShellRuntimeStop(t *testing.T) { + rt := NewShellRuntime(Config{ + UID: DEFAULT_UID, + GID: DEFAULT_GID, + Rexec: func(args ...string) *exec.Cmd { + cmd := exec.Command(args[5], args[6:]...) + return cmd + }, + }) + + tk := &tork.Task{ + ID: uuid.NewUUID(), + Run: "sleep 5", + } + + ch := make(chan any) + + go func() { + err := rt.Run(context.Background(), tk) + assert.Error(t, err) + close(ch) + }() + + time.Sleep(time.Second * 1) + + err := rt.Stop(context.Background(), tk) + assert.NoError(t, err) + <-ch +} diff --git a/internal/worker/api_test.go b/internal/worker/api_test.go index 2b1c2bd3..bb515360 100644 --- a/internal/worker/api_test.go +++ b/internal/worker/api_test.go @@ -6,13 +6,13 @@ import ( "net/http/httptest" "testing" - "github.com/runabol/tork/internal/runtime" + "github.com/runabol/tork/internal/runtime/docker" "github.com/runabol/tork/mq" "github.com/stretchr/testify/assert" ) func Test_health(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) api := newAPI(Config{ Broker: mq.NewInMemoryBroker(), diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index d5391f7f..fc4d7845 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/runabol/tork" - "github.com/runabol/tork/internal/runtime" + "github.com/runabol/tork/internal/runtime/docker" "github.com/runabol/tork/internal/uuid" "github.com/runabol/tork/middleware/task" "github.com/runabol/tork/mount" @@ -17,7 +17,7 @@ import ( ) func TestNewWorker(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) w, err := NewWorker(Config{}) assert.Error(t, err) @@ -32,7 +32,7 @@ func TestNewWorker(t *testing.T) { } func TestStart(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) w, err := NewWorker(Config{ @@ -46,7 +46,7 @@ func TestStart(t *testing.T) { } func Test_handleTaskRun(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -103,7 +103,7 @@ func Test_handleTaskRun(t *testing.T) { } func Test_handleTaskRunOutput(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -136,7 +136,7 @@ func Test_handleTaskRunOutput(t *testing.T) { } func Test_handleTaskRunWithPrePost(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -195,7 +195,7 @@ func Test_handleTaskRunWithPrePost(t *testing.T) { } func Test_handleTaskCancel(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -248,7 +248,7 @@ func Test_handleTaskCancel(t *testing.T) { } func Test_handleTaskError(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -287,7 +287,7 @@ func Test_handleTaskError(t *testing.T) { } func Test_handleTaskOutput(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -326,7 +326,7 @@ func Test_handleTaskOutput(t *testing.T) { } func Test_middleware(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker() @@ -379,7 +379,7 @@ func Test_middleware(t *testing.T) { } func Test_sendHeartbeat(t *testing.T) { - rt, err := runtime.NewDockerRuntime() + rt, err := docker.NewDockerRuntime() assert.NoError(t, err) b := mq.NewInMemoryBroker()