Skip to content

Commit

Permalink
Minor refactoring of worker
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 22, 2023
1 parent 032c327 commit e83b426
Showing 1 changed file with 36 additions and 20 deletions.
56 changes: 36 additions & 20 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,34 +245,25 @@ func (w *Worker) executeTask(ctx context.Context, t *tork.Task) error {

func (w *Worker) doExecuteTask(ctx context.Context, o *tork.Task) error {
t := o.Clone()
mnt := mount.Mount{Type: mount.TypeTemp, Target: "/tork"}
if err := w.mounter.Mount(ctx, &mnt); err != nil {
// create a mount for the work directory
workmnt, err := w.createWorkdirMount(ctx, t)
if err != nil {
return err
}
defer func() {
if err := w.mounter.Unmount(ctx, &mnt); err != nil {
log.Error().Err(err).Msgf("error unmounting temp dir")
if err := w.mounter.Unmount(ctx, &workmnt); err != nil {
log.Error().Err(err).Msgf("error unmounting work dir")
}
}()
if err := os.WriteFile(path.Join(mnt.Source, "entrypoint"), []byte(t.Run), os.ModePerm); err != nil {
return err
}
// create an empty file for stdout
stdoutFile := "stdout"
if err := os.WriteFile(path.Join(mnt.Source, stdoutFile), []byte{}, os.ModePerm); err != nil {
if err := os.WriteFile(path.Join(workmnt.Source, stdoutFile), []byte{}, os.ModePerm); err != nil {
return err
}
for filename, contents := range t.Files {
if err := os.WriteFile(path.Join(mnt.Source, filename), []byte(contents), os.ModePerm); err != nil {
return err
}
if err := os.Chmod(path.Join(mnt.Source, filename), 0444); err != nil {
return errors.Wrapf(err, "error making file %s read only", filename)
}
}
t.Mounts = append(t.Mounts, mount.Mount{
Type: mount.TypeBind,
Source: mnt.Source,
Target: mnt.Target,
Source: workmnt.Source,
Target: workmnt.Target,
})
// set the path for task outputs
if t.Env == nil {
Expand All @@ -290,11 +281,13 @@ func (w *Worker) doExecuteTask(ctx context.Context, o *tork.Task) error {
defer cancel()
rctx = tctx
}
// run the task
if err := w.runtime.Run(rctx, t); err != nil {
return err
}
if _, err := os.Stat(path.Join(mnt.Source, stdoutFile)); err == nil {
contents, err := os.ReadFile(path.Join(mnt.Source, stdoutFile))
// read the stdout file
if _, err := os.Stat(path.Join(workmnt.Source, stdoutFile)); err == nil {
contents, err := os.ReadFile(path.Join(workmnt.Source, stdoutFile))
if err != nil {
return errors.Wrapf(err, "error reading output file")
}
Expand All @@ -303,6 +296,29 @@ func (w *Worker) doExecuteTask(ctx context.Context, o *tork.Task) error {
return nil
}

func (w *Worker) createWorkdirMount(ctx context.Context, t *tork.Task) (mount.Mount, error) {
mnt := mount.Mount{Type: mount.TypeTemp, Target: "/tork"}
if err := w.mounter.Mount(ctx, &mnt); err != nil {
return mount.Mount{}, err
}
if err := os.WriteFile(path.Join(mnt.Source, "entrypoint"), []byte(t.Run), os.ModePerm); err != nil {
return mount.Mount{}, err
}
for filename, contents := range t.Files {
if err := os.WriteFile(path.Join(mnt.Source, filename), []byte(contents), os.ModePerm); err != nil {
return mount.Mount{}, err
}
if err := os.Chmod(path.Join(mnt.Source, filename), 0444); err != nil {
return mount.Mount{}, errors.Wrapf(err, "error making file %s read only", filename)
}
}
return mount.Mount{
Type: mount.TypeBind,
Source: mnt.Source,
Target: mnt.Target,
}, nil
}

func (w *Worker) sendHeartbeats() {
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down

0 comments on commit e83b426

Please sign in to comment.