Skip to content

Commit

Permalink
Fix removing tasks when a jobs service is removed
Browse files Browse the repository at this point in the history
Previously, the jobs orchestrators were missing the appropriate code to
handle the deletion of a service. As a result, when a service was
deleted, the tasks for that service would hang around indefinitely. It's
likely that on a leadership change or restart, the tasks would have been
deleted, but that's generally not an acceptable process as leadership
changes or restarts aren't guaranteed.

Fixes this issue by adding event handling for services, similar to
global or replicated services, that moves all tasks for a deleted
service into Remove state, flagging them for deletion.

Signed-off-by: Drew Erny <[email protected]>
  • Loading branch information
dperny committed Jan 25, 2023
1 parent 904c221 commit 75e563b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
5 changes: 5 additions & 0 deletions manager/orchestrator/jobs/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ func (o *Orchestrator) handleEvent(ctx context.Context, event events.Event) {
service = ev.Service
case api.EventUpdateService:
service = ev.Service
case api.EventDeleteService:
if orchestrator.IsReplicatedJob(ev.Service) || orchestrator.IsGlobalJob(ev.Service) {
orchestrator.SetServiceTasksRemove(ctx, o.store, ev.Service)
o.restartSupervisor.ClearServiceHistory(ev.Service.ID)
}
case api.EventUpdateTask:
task = ev.Task
}
Expand Down
83 changes: 83 additions & 0 deletions manager/orchestrator/jobs/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,89 @@ var _ = Describe("Replicated job orchestrator", func() {
))
})

When("a service is deleted", func() {
BeforeEach(func() {
err := s.Update(func(tx store.Tx) error {
service := &api.Service{
ID: "serviceDelete",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "serviceDelete",
},
Mode: &api.ServiceSpec_ReplicatedJob{
ReplicatedJob: &api.ReplicatedJob{
MaxConcurrent: 1,
TotalCompletions: 1,
},
},
},
}

if err := store.CreateService(tx, service); err != nil {
return err
}

// create some tasks, like the service was actually running
task1 := &api.Task{
ID: "task1",
ServiceID: "serviceDelete",
DesiredState: api.TaskStateCompleted,
Status: api.TaskStatus{
State: api.TaskStateCompleted,
},
}

task2 := &api.Task{
ID: "task2",
ServiceID: "serviceDelete",
DesiredState: api.TaskStateCompleted,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
}

if err := store.CreateTask(tx, task1); err != nil {
return err
}
return store.CreateTask(tx, task2)
})

Expect(err).NotTo(HaveOccurred())

// wait for a pass through the reconciler
Eventually(replicated.getServicesReconciled).Should(ConsistOf(
"serviceDelete",
))
})

It("should remove tasks when a service is deleted", func() {
err := s.Update(func(tx store.Tx) error {
return store.DeleteService(tx, "serviceDelete")
})
Expect(err).NotTo(HaveOccurred())

Eventually(func() []*api.Task {
var tasks []*api.Task
s.View(func(tx store.ReadTx) {
tasks, _ = store.FindTasks(tx, store.ByServiceID("serviceDelete"))
})
return tasks
}).Should(SatisfyAll(
HaveLen(2),
WithTransform(
func(tasks []*api.Task) []api.TaskState {
states := []api.TaskState{}
for _, task := range tasks {
states = append(states, task.DesiredState)
}
return states
},
ConsistOf(api.TaskStateCompleted, api.TaskStateCompleted),
),
))
})
})

When("receiving task events", func() {
BeforeEach(func() {
service := &api.Service{
Expand Down

0 comments on commit 75e563b

Please sign in to comment.