Skip to content

Commit

Permalink
implement the timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
z4yx committed May 25, 2020
1 parent e47ba20 commit 3186221
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
20 changes: 15 additions & 5 deletions worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,29 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
}
// Now terminating the provider is feasible

var termErr error
timeout := provider.Timeout()
if timeout <= 0 {
timeout = 100000 * time.Hour // never time out
}
select {
case syncErr = <-syncDone:
logger.Debug("syncing done")
case <-time.After(timeout):
logger.Notice("provider timeout")
stopASAP = true
termErr = provider.Terminate()
syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout)
case <-kill:
logger.Debug("received kill")
stopASAP = true
err := provider.Terminate()
if err != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
return err
}
termErr = provider.Terminate()
syncErr = errors.New("killed by manager")
}
if termErr != nil {
logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error())
return termErr
}

// post-exec hooks
herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec")
Expand Down
39 changes: 39 additions & 0 deletions worker/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestMirrorJob(t *testing.T) {
logDir: tmpDir,
logFile: tmpFile,
interval: 1 * time.Second,
timeout: 7 * time.Second,
}

provider, err := newCmdProvider(c)
Expand All @@ -41,6 +42,7 @@ func TestMirrorJob(t *testing.T) {
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
So(provider.Timeout(), ShouldEqual, c.timeout)

Convey("For a normal mirror job", func(ctx C) {
scriptContent := `#!/bin/bash
Expand Down Expand Up @@ -333,6 +335,43 @@ echo $TUNASYNC_WORKING_DIR
})
})


Convey("When a job timed out", func(ctx C) {
scriptContent := `#!/bin/bash
echo $TUNASYNC_WORKING_DIR
sleep 10
echo $TUNASYNC_WORKING_DIR
`
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)

managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, 1)
job := newMirrorJob(provider)

Convey("It should be automatically terminated", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart

time.Sleep(1 * time.Second)
msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)

job.ctrlChan <- jobStart // should be ignored

msg = <-managerChan
So(msg.status, ShouldEqual, Failed)

expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir())
loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})
})
})

}
Expand Down

0 comments on commit 3186221

Please sign in to comment.