diff --git a/worker/job.go b/worker/job.go index c77528c..1f89130 100644 --- a/worker/job.go +++ b/worker/job.go @@ -170,19 +170,29 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err } // Now terminating the provider is feasible + var termErr error + timeout := provider.Timeout() + if timeout <= 0 { + timeout = 100000 * time.Hour // never time out + } select { case syncErr = <-syncDone: logger.Debug("syncing done") + case <-time.After(timeout): + logger.Notice("provider timeout") + stopASAP = true + termErr = provider.Terminate() + syncErr = fmt.Errorf("%s timeout after %v", m.Name(), timeout) case <-kill: logger.Debug("received kill") stopASAP = true - err := provider.Terminate() - if err != nil { - logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error()) - return err - } + termErr = provider.Terminate() syncErr = errors.New("killed by manager") } + if termErr != nil { + logger.Errorf("failed to terminate provider %s: %s", m.Name(), err.Error()) + return termErr + } // post-exec hooks herr := runHooks(rHooks, func(h jobHook) error { return h.postExec() }, "post-exec") diff --git a/worker/job_test.go b/worker/job_test.go index 8bce93b..fded5fe 100644 --- a/worker/job_test.go +++ b/worker/job_test.go @@ -31,6 +31,7 @@ func TestMirrorJob(t *testing.T) { logDir: tmpDir, logFile: tmpFile, interval: 1 * time.Second, + timeout: 7 * time.Second, } provider, err := newCmdProvider(c) @@ -41,6 +42,7 @@ func TestMirrorJob(t *testing.T) { So(provider.LogDir(), ShouldEqual, c.logDir) So(provider.LogFile(), ShouldEqual, c.logFile) So(provider.Interval(), ShouldEqual, c.interval) + So(provider.Timeout(), ShouldEqual, c.timeout) Convey("For a normal mirror job", func(ctx C) { scriptContent := `#!/bin/bash @@ -333,6 +335,43 @@ echo $TUNASYNC_WORKING_DIR }) }) + + Convey("When a job timed out", func(ctx C) { + scriptContent := `#!/bin/bash +echo $TUNASYNC_WORKING_DIR +sleep 10 +echo $TUNASYNC_WORKING_DIR + ` + err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755) + So(err, ShouldBeNil) + + managerChan := make(chan jobMessage, 10) + semaphore := make(chan empty, 1) + job := newMirrorJob(provider) + + Convey("It should be automatically terminated", func(ctx C) { + go job.Run(managerChan, semaphore) + job.ctrlChan <- jobStart + + time.Sleep(1 * time.Second) + msg := <-managerChan + So(msg.status, ShouldEqual, PreSyncing) + msg = <-managerChan + So(msg.status, ShouldEqual, Syncing) + + job.ctrlChan <- jobStart // should be ignored + + msg = <-managerChan + So(msg.status, ShouldEqual, Failed) + + expectedOutput := fmt.Sprintf("%s\n", provider.WorkingDir()) + loggedContent, err := ioutil.ReadFile(provider.LogFile()) + So(err, ShouldBeNil) + So(string(loggedContent), ShouldEqual, expectedOutput) + job.ctrlChan <- jobDisable + <-job.disabled + }) + }) }) }