Skip to content

Commit

Permalink
feature(worker): when SIGINT/SIGTERM received, stop all the jobs and …
Browse files Browse the repository at this point in the history
…update their status before quit. Close tuna#19.
  • Loading branch information
bigeagle committed May 2, 2016
1 parent 2d2df65 commit 76ad3d4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
7 changes: 5 additions & 2 deletions cmd/tunasync/tunasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func startWorker(c *cli.Context) {
time.Sleep(1 * time.Second)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
for {
s := <-sigChan
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGTERM)
for s := range sigChan {
switch s {
case syscall.SIGHUP:
logger.Info("Received reload signal")
Expand All @@ -71,6 +72,8 @@ func startWorker(c *cli.Context) {
logger.Errorf("Error loading config: %s", err.Error())
}
w.ReloadMirrorConfig(newCfg.Mirrors)
case syscall.SIGINT, syscall.SIGTERM:
w.Halt()
}
}
}()
Expand Down
17 changes: 15 additions & 2 deletions worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"errors"
"fmt"
"sync"
"sync/atomic"

tunasync "github.com/tuna/tunasync/internal"
Expand All @@ -18,6 +19,7 @@ const (
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
jobHalt // worker halts
)

type jobMessage struct {
Expand All @@ -36,8 +38,14 @@ const (
statePaused
// disabled by jobDisable
stateDisabled
// worker is halting
stateHalting
)

// use to ensure all jobs are finished before
// worker exit
var jobsDone sync.WaitGroup

type mirrorJob struct {
provider mirrorProvider
ctrlChan chan ctrlAction
Expand Down Expand Up @@ -82,11 +90,11 @@ func (m *mirrorJob) SetProvider(provider mirrorProvider) error {
// sempaphore: make sure the concurrent running syncing job won't explode
// TODO: message struct for managerChan
func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) error {

jobsDone.Add(1)
m.disabled = make(chan empty)
defer func() {
close(m.disabled)
m.SetState(stateDisabled)
jobsDone.Done()
}()

provider := m.provider
Expand Down Expand Up @@ -244,6 +252,11 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
case jobStart:
m.SetState(stateReady)
goto _wait_for_job
case jobHalt:
m.SetState(stateHalting)
close(kill)
<-jobDone
return nil
default:
// TODO: implement this
close(kill)
Expand Down
38 changes: 37 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Worker struct {

managerChan chan jobMessage
semaphore chan empty
exit chan empty

schedule *scheduleQueue
httpEngine *gin.Engine
Expand All @@ -38,6 +39,7 @@ func GetTUNASyncWorker(cfg *Config) *Worker {

managerChan: make(chan jobMessage, 32),
semaphore: make(chan empty, cfg.Global.Concurrent),
exit: make(chan empty),

schedule: newScheduleQueue(),
}
Expand Down Expand Up @@ -222,6 +224,21 @@ func (w *Worker) runHTTPServer() {
}
}

// Halt stops all jobs
func (w *Worker) Halt() {
w.L.Lock()
logger.Notice("Stopping all the jobs")
for _, job := range w.jobs {
if job.State() != stateDisabled {
job.ctrlChan <- jobHalt
}
}
jobsDone.Wait()
logger.Notice("All the jobs are stopped")
w.L.Unlock()
close(w.exit)
}

// Run runs worker forever
func (w *Worker) Run() {
w.registorWorker()
Expand Down Expand Up @@ -284,7 +301,7 @@ func (w *Worker) runSchedule() {
continue
}

if job.State() != stateReady {
if (job.State() != stateReady) && (job.State() != stateHalting) {
logger.Infof("Job %s state is not ready, skip adding new schedule", jobMsg.name)
continue
}
Expand Down Expand Up @@ -312,6 +329,25 @@ func (w *Worker) runSchedule() {
if job := w.schedule.Pop(); job != nil {
job.ctrlChan <- jobStart
}
case <-w.exit:
// flush status update messages
w.L.Lock()
defer w.L.Unlock()
for {
select {
case jobMsg := <-w.managerChan:
logger.Debugf("status update from %s", jobMsg.name)
job, ok := w.jobs[jobMsg.name]
if !ok {
continue
}
if jobMsg.status == Failed || jobMsg.status == Success {
w.updateStatus(job, jobMsg)
}
default:
return
}
}
}
}
}
Expand Down

0 comments on commit 76ad3d4

Please sign in to comment.