Skip to content

Commit

Permalink
reworking runtime interface
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 5, 2023
1 parent 32eb15d commit 27c747a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 26 deletions.
68 changes: 47 additions & 21 deletions runtime/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ package runtime
import (
"context"
"io"
"log"
"os"
"strings"
"sync"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/tork/task"
)

type DockerRuntime struct {
client *client.Client
tasks map[string]string
mu sync.RWMutex
}

func NewDockerRuntime() (*DockerRuntime, error) {
Expand All @@ -27,19 +30,20 @@ func NewDockerRuntime() (*DockerRuntime, error) {
return &DockerRuntime{
client: dc,
tasks: make(map[string]string),
mu: sync.RWMutex{},
}, nil
}

func (d *DockerRuntime) Start(ctx context.Context, t task.Task) error {
func (d *DockerRuntime) Run(ctx context.Context, t task.Task) (string, error) {
reader, err := d.client.ImagePull(
ctx, t.Image, types.ImagePullOptions{})
if err != nil {
log.Printf("Error pulling image %s: %v\n", t.Image, err)
return err
log.Error().Err(err).Msgf("Error pulling image %s: %v\n", t.Image, err)
return "", err
}
_, err = io.Copy(os.Stdout, reader)
if err != nil {
return err
return "", err
}

rp := container.RestartPolicy{
Expand All @@ -65,49 +69,71 @@ func (d *DockerRuntime) Start(ctx context.Context, t task.Task) error {
resp, err := d.client.ContainerCreate(
ctx, &cc, &hc, nil, nil, t.ID)
if err != nil {
log.Printf(
log.Error().Msgf(
"Error creating container using image %s: %v\n",
t.Image, err,
)
return err
return "", err
}

d.mu.Lock()
d.tasks[t.ID] = resp.ID
d.mu.Unlock()

err = d.client.ContainerStart(
ctx, resp.ID, types.ContainerStartOptions{})
if err != nil {
return errors.Wrapf(err, "error starting container %s: %v\n", resp.ID, err)
return "", errors.Wrapf(err, "error starting container %s: %v\n", resp.ID, err)
}

out, err := d.client.ContainerLogs(
ctx,
resp.ID,
types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true},
)
defer func() {
if err := out.Close(); err != nil {
log.Error().Err(err).Msgf("error closing stdout on container %s", resp.ID)
}
}()
if err != nil {
return errors.Wrapf(err, "error getting logs for container %s: %v\n", resp.ID, err)
return "", errors.Wrapf(err, "error getting logs for container %s: %v\n", resp.ID, err)
}

_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, out)
// limit the amount of data read from stdout to prevent memory exhaustion
lr := &io.LimitedReader{R: out, N: 1024}
buf := new(strings.Builder)
_, err = stdcopy.StdCopy(buf, buf, lr)
if err != nil {
return errors.Wrapf(err, "error reading the std out")
return "", errors.Wrapf(err, "error reading the std out")
}

d.tasks[t.ID] = resp.ID
statusCh, errCh := d.client.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)

return nil
select {
case err := <-errCh:
if err != nil {
return "", err
}
case status := <-statusCh:
log.Debug().Msgf("status.StatusCode: %#+v\n", status.StatusCode)
}

return buf.String(), nil
}

func (d *DockerRuntime) Stop(ctx context.Context, t task.Task) error {
func (d *DockerRuntime) Cancel(ctx context.Context, t task.Task) error {
d.mu.RLock()
containerID, ok := d.tasks[t.ID]
d.mu.RUnlock()
if !ok {
return nil
}
log.Printf("Attempting to stop container %v", containerID)
err := d.client.ContainerStop(ctx, containerID, container.StopOptions{})
if err != nil {
return err
}
err = d.client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{RemoveVolumes: true, RemoveLinks: false, Force: false})
log.Printf("Attempting to stop and remove container %v", containerID)
err := d.client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{
RemoveVolumes: true,
RemoveLinks: false,
Force: true,
})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ const (

// Runtime is the actual runtime environment that executes a task.
type Runtime interface {
Start(ctx context.Context, t task.Task) error
Stop(ctx context.Context, t task.Task) error
Run(ctx context.Context, t task.Task) (string, error)
Cancel(ctx context.Context, t task.Task) error
}
6 changes: 3 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func NewWorker(cfg Config) *Worker {
return w
}

func (w *Worker) startTask(ctx context.Context, t task.Task) error {
func (w *Worker) runTask(ctx context.Context, t task.Task) error {
if t.State != task.Pending {
return errors.Errorf("can't start a task in %s state", t.State)
}
err := w.runtime.Start(ctx, t)
_, err := w.runtime.Run(ctx, t)
if err != nil {
log.Printf("error running task %v: %v\n", t.ID, err)
return err
Expand All @@ -64,7 +64,7 @@ func (w *Worker) collectStats() {

func (w *Worker) Start() error {
log.Info().Msgf("starting %s", w.Name)
err := w.broker.Subscribe(w.Name, w.startTask)
err := w.broker.Subscribe(w.Name, w.runTask)
if err != nil {
return errors.Wrapf(err, "error subscribing for queue: %s", w.Name)
}
Expand Down

0 comments on commit 27c747a

Please sign in to comment.