Skip to content

Commit

Permalink
Feature: Scheduled job status
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 13, 2023
1 parent 4cf3cb6 commit 3354709
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 17 deletions.
2 changes: 1 addition & 1 deletion internal/coordinator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (s *API) cancelJob(c echo.Context) error {
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, err.Error())
}
if j.State != tork.JobStateRunning {
if j.State != tork.JobStateRunning && j.State != tork.JobStateScheduled {
return echo.NewHTTPError(http.StatusBadRequest, "job is not running")
}
j.State = tork.JobStateCancelled
Expand Down
68 changes: 68 additions & 0 deletions internal/coordinator/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,74 @@ func Test_cancelRunningJob(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_cancelScheduledJob(t *testing.T) {
ctx := context.Background()
ds := datastore.NewInMemoryDatastore()
j1 := tork.Job{
ID: uuid.NewUUID(),
State: tork.JobStateScheduled,
CreatedAt: time.Now().UTC(),
}
err := ds.CreateJob(ctx, &j1)
assert.NoError(t, err)

now := time.Now().UTC()

tasks := []tork.Task{{
ID: uuid.NewUUID(),
State: tork.TaskStatePending,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: tork.TaskStateScheduled,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: tork.TaskStateRunning,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: tork.TaskStateCancelled,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: tork.TaskStateCompleted,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: tork.TaskStateFailed,
CreatedAt: &now,
JobID: j1.ID,
}}

for _, ta := range tasks {
err := ds.CreateTask(ctx, &ta)
assert.NoError(t, err)
}

api, err := NewAPI(Config{
DataStore: ds,
Broker: mq.NewInMemoryBroker(),
})
assert.NoError(t, err)
assert.NotNil(t, api)
req, err := http.NewRequest("PUT", fmt.Sprintf("/jobs/%s/cancel", j1.ID), nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, "{\"status\":\"OK\"}\n", string(body))
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_restartJob(t *testing.T) {
ctx := context.Background()
ds := datastore.NewInMemoryDatastore()
Expand Down
6 changes: 5 additions & 1 deletion internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func NewCoordinator(cfg Config) (*Coordinator, error) {
)

onStarted := task.ApplyMiddleware(
handlers.NewStartedHandler(cfg.DataStore, cfg.Broker),
handlers.NewStartedHandler(
cfg.DataStore,
cfg.Broker,
cfg.Middleware.Job...,
),
cfg.Middleware.Task,
)

Expand Down
4 changes: 2 additions & 2 deletions internal/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestJobMiddlewareNoOp(t *testing.T) {
j2, err := ds.GetJobByID(context.Background(), j.ID)
assert.NoError(t, err)

assert.Equal(t, tork.JobStateRunning, j2.State)
assert.Equal(t, tork.JobStateScheduled, j2.State)
}

func TestNodeMiddlewareModify(t *testing.T) {
Expand Down Expand Up @@ -367,7 +367,7 @@ func doRunJob(t *testing.T, filename string) *tork.Job {
assert.NoError(t, err)

iter := 0
for j2.State == tork.JobStateRunning && iter < 10 {
for (j2.State == tork.JobStateRunning || j2.State == tork.JobStateScheduled) && iter < 10 {
time.Sleep(time.Second)
j2, err = ds.GetJobByID(ctx, j2.ID)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/handlers/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewCancelHandler(ds datastore.Datastore, b mq.Broker) job.HandlerFunc {
func (h *cancelHandler) handle(ctx context.Context, _ job.EventType, j *tork.Job) error {
// mark the job as cancelled
if err := h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
if u.State != tork.JobStateRunning {
if u.State != tork.JobStateRunning && u.State != tork.JobStateScheduled {
// job is not running -- nothing to cancel
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/handlers/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *errorHandler) handle(ctx context.Context, et task.EventType, t *tork.Ta
return errors.Wrapf(err, "error marking task %s as FAILED", t.ID)
}
// eligible for retry?
if j.State == tork.JobStateRunning &&
if (j.State == tork.JobStateRunning || j.State == tork.JobStateScheduled) &&
t.Retry != nil &&
t.Retry.Attempts < t.Retry.Limit {
// create a new retry task
Expand Down
21 changes: 17 additions & 4 deletions internal/coordinator/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ func (h *jobHandler) handle(ctx context.Context, et job.EventType, j *tork.Job)
return h.completeJob(ctx, j)
case tork.JobStateFailed:
return h.failJob(ctx, j)
case tork.JobStateRunning:
return h.markJobAsRunning(ctx, j)
default:
return errors.Errorf("invalud job state: %s", j.State)
return errors.Errorf("invalid job state: %s", j.State)
}
}

Expand All @@ -68,7 +70,7 @@ func (h *jobHandler) startJob(ctx context.Context, j *tork.Job) error {
}
if err := h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
n := time.Now().UTC()
u.State = tork.JobStateRunning
u.State = tork.JobStateScheduled
u.StartedAt = &n
u.Position = 1
return nil
Expand All @@ -87,7 +89,7 @@ func (h *jobHandler) startJob(ctx context.Context, j *tork.Job) error {
func (h *jobHandler) completeJob(ctx context.Context, j *tork.Job) error {
// mark the job as completed
if err := h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
if u.State != tork.JobStateRunning {
if u.State != tork.JobStateRunning && u.State != tork.JobStateScheduled {
return errors.Errorf("job %s is %s and can not be completed", u.ID, u.State)
}
now := time.Now().UTC()
Expand Down Expand Up @@ -137,6 +139,17 @@ func (h *jobHandler) completeJob(ctx context.Context, j *tork.Job) error {
}
}

func (h *jobHandler) markJobAsRunning(ctx context.Context, j *tork.Job) error {
return h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
if u.State != tork.JobStateScheduled {
return nil
}
u.State = tork.JobStateRunning
u.FailedAt = nil
return nil
})
}

func (h *jobHandler) restartJob(ctx context.Context, j *tork.Job) error {
// mark the job as running
if err := h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
Expand Down Expand Up @@ -174,7 +187,7 @@ func (h *jobHandler) failJob(ctx context.Context, j *tork.Job) error {
// we only want to make the job as FAILED
// if it's actually running as opposed to
// possibly being CANCELLED
if u.State == tork.JobStateRunning {
if u.State == tork.JobStateRunning || u.State == tork.JobStateScheduled {
u.State = tork.JobStateFailed
u.FailedAt = j.FailedAt
}
Expand Down
6 changes: 3 additions & 3 deletions internal/coordinator/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Test_handleJobs(t *testing.T) {

j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)
assert.Equal(t, tork.JobStateRunning, j2.State)
assert.Equal(t, tork.JobStateScheduled, j2.State)
}

func Test_handleCancelJob(t *testing.T) {
Expand Down Expand Up @@ -103,7 +103,7 @@ func Test_handleCancelJob(t *testing.T) {

j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)
assert.Equal(t, tork.JobStateRunning, j2.State)
assert.Equal(t, tork.JobStateScheduled, j2.State)

j1.State = tork.JobStateCancelled
// cancel the job
Expand Down Expand Up @@ -146,7 +146,7 @@ func Test_handleRestartJob(t *testing.T) {

j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)
assert.Equal(t, tork.JobStateRunning, j2.State)
assert.Equal(t, tork.JobStateScheduled, j2.State)

// cancel the job
j1.State = tork.JobStateCancelled
Expand Down
16 changes: 14 additions & 2 deletions internal/coordinator/handlers/started.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import (
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/middleware/job"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
)

type startedHandler struct {
ds datastore.Datastore
broker mq.Broker
onJob job.HandlerFunc
}

func NewStartedHandler(ds datastore.Datastore, b mq.Broker) task.HandlerFunc {
func NewStartedHandler(ds datastore.Datastore, b mq.Broker, mw ...job.MiddlewareFunc) task.HandlerFunc {
h := &startedHandler{
ds: ds,
broker: b,
onJob: job.ApplyMiddleware(NewJobHandler(ds, b), mw),
}
return h.handle
}
Expand All @@ -35,14 +38,23 @@ func (h *startedHandler) handle(ctx context.Context, et task.EventType, t *tork.
}
// if the job isn't running anymore we need
// to cancel the task
if j.State != tork.JobStateRunning {
if j.State != tork.JobStateRunning && j.State != tork.JobStateScheduled {
t.State = tork.TaskStateCancelled
node, err := h.ds.GetNodeByID(ctx, t.NodeID)
if err != nil {
return err
}
return h.broker.PublishTask(ctx, node.Queue, t)
}
// if this is the first task that started
// we want to switch the state of the job
// from SCHEDULED to RUNNING
if j.State == tork.JobStateScheduled {
j.State = tork.JobStateRunning
if err := h.onJob(ctx, job.StateChange, j); err != nil {
return nil
}
}
return h.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
// we don't want to mark the task as RUNNING
// if an out-of-order task completion/failure
Expand Down
7 changes: 6 additions & 1 deletion internal/coordinator/handlers/started_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Test_handleStartedTask(t *testing.T) {

j1 := &tork.Job{
ID: uuid.NewUUID(),
State: tork.JobStateRunning,
State: tork.JobStateScheduled,
}
err := ds.CreateJob(ctx, j1)
assert.NoError(t, err)
Expand All @@ -49,6 +49,11 @@ func Test_handleStartedTask(t *testing.T) {
assert.Equal(t, tork.TaskStateRunning, t2.State)
assert.Equal(t, t1.StartedAt, t2.StartedAt)
assert.Equal(t, t1.NodeID, t2.NodeID)

j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)

assert.Equal(t, tork.JobStateRunning, j2.State)
}

func Test_handleStartedTaskOfFailedJob(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *Scheduler) scheduleRegularTask(ctx context.Context, t *tork.Task) error
if t.Queue == "" {
t.Queue = mq.QUEUE_DEFAULT
}
// mark job state as scheduled
// mark task state as scheduled
t.State = tork.TaskStateScheduled
t.ScheduledAt = &now
if err := s.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
Expand Down
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type JobState string

const (
JobStatePending JobState = "PENDING"
JobStateScheduled JobState = "SCHEDULED"
JobStateRunning JobState = "RUNNING"
JobStateCancelled JobState = "CANCELLED"
JobStateCompleted JobState = "COMPLETED"
Expand Down

0 comments on commit 3354709

Please sign in to comment.