Skip to content

Commit

Permalink
Merge pull request tuna#81 from tuna/wip-override-concurrent-limit
Browse files Browse the repository at this point in the history
New feature: run "tunasynctl start" with "-f" to override the limit of concurrent jobs
shankerwangmiao authored May 31, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 285ffb2 + c750aa1 commit 628266a
Showing 12 changed files with 375 additions and 56 deletions.
12 changes: 11 additions & 1 deletion cmd/tunasynctl/tunasynctl.go
Original file line number Diff line number Diff line change
@@ -285,11 +285,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
"argument WORKER", 1)
}

options := map[string]bool{}
if c.Bool("force") {
options["force"] = true
}
cmd := tunasync.ClientCmd{
Cmd: cmd,
MirrorID: mirrorID,
WorkerID: c.String("worker"),
Args: argsList,
Options: options,
}
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
if err != nil {
@@ -410,6 +415,11 @@ func main() {
},
}

forceStartFlag := cli.BoolFlag{
Name: "force, f",
Usage: "Override the concurrent limit",
}

app.Commands = []cli.Command{
{
Name: "list",
@@ -450,7 +460,7 @@ func main() {
{
Name: "start",
Usage: "Start a job",
Flags: append(commonFlags, cmdFlags...),
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
},
{
16 changes: 9 additions & 7 deletions internal/msg.go
Original file line number Diff line number Diff line change
@@ -68,9 +68,10 @@ func (c CmdVerb) String() string {
// A WorkerCmd is the command message send from the
// manager to a worker
type WorkerCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}

func (c WorkerCmd) String() string {
@@ -83,8 +84,9 @@ func (c WorkerCmd) String() string {
// A ClientCmd is the command message send from client
// to the manager
type ClientCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}
1 change: 1 addition & 0 deletions manager/server.go
Original file line number Diff line number Diff line change
@@ -337,6 +337,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args,
Options: clientCmd.Options,
}

// update job status, even if the job did not disable successfully,
39 changes: 14 additions & 25 deletions worker/base_provider.go
Original file line number Diff line number Diff line change
@@ -20,8 +20,6 @@ type baseProvider struct {
cmd *cmdJob
isRunning atomic.Value

logFile *os.File

cgroup *cgroupHook
zfs *zfsHook
docker *dockerHook
@@ -111,20 +109,21 @@ func (p *baseProvider) Docker() *dockerHook {
return p.docker
}

func (p *baseProvider) prepareLogFile() error {
func (p *baseProvider) prepareLogFile(append bool) error {
if p.LogFile() == "/dev/null" {
p.cmd.SetLogFile(nil)
return nil
}
if p.logFile == nil {
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.logFile = logFile
appendMode := 0
if append {
appendMode = os.O_APPEND
}
p.cmd.SetLogFile(p.logFile)
logFile, err := os.OpenFile(p.LogFile(), os.O_WRONLY|os.O_CREATE|appendMode, 0644)
if err != nil {
logger.Errorf("Error opening logfile %s: %s", p.LogFile(), err.Error())
return err
}
p.cmd.SetLogFile(logFile)
return nil
}

@@ -143,32 +142,22 @@ func (p *baseProvider) IsRunning() bool {

func (p *baseProvider) Wait() error {
defer func() {
p.Lock()
logger.Debugf("set isRunning to false: %s", p.Name())
p.isRunning.Store(false)
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()
}()
logger.Debugf("calling Wait: %s", p.Name())
return p.cmd.Wait()
}

func (p *baseProvider) Terminate() error {
p.Lock()
defer p.Unlock()
logger.Debugf("terminating provider: %s", p.Name())
if !p.IsRunning() {
return nil
}

p.Lock()
if p.logFile != nil {
p.logFile.Close()
p.logFile = nil
}
p.Unlock()

err := p.cmd.Terminate()
p.isRunning.Store(false)

return err
}
10 changes: 9 additions & 1 deletion worker/cmd_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker

import (
"errors"
"time"

"github.com/anmitsu/go-shlex"
@@ -60,6 +61,13 @@ func (p *cmdProvider) Run() error {
}

func (p *cmdProvider) Start() error {
p.Lock()
defer p.Unlock()

if p.IsRunning() {
return errors.New("provider is currently running")
}

env := map[string]string{
"TUNASYNC_MIRROR_NAME": p.Name(),
"TUNASYNC_WORKING_DIR": p.WorkingDir(),
@@ -71,7 +79,7 @@ func (p *cmdProvider) Start() error {
env[k] = v
}
p.cmd = newCmdJob(p, p.command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(false); err != nil {
return err
}

41 changes: 29 additions & 12 deletions worker/job.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

tunasync "github.com/tuna/tunasync/internal"
)
@@ -14,12 +15,13 @@ import (
type ctrlAction uint8

const (
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
jobHalt // worker halts
jobStart ctrlAction = iota
jobStop // stop syncing keep the job
jobDisable // disable the job (stops goroutine)
jobRestart // restart syncing
jobPing // ensure the goroutine is alive
jobHalt // worker halts
jobForceStart // ignore concurrent limit
)

type jobMessage struct {
@@ -154,9 +156,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
syncDone := make(chan error, 1)
go func() {
err := provider.Run()
if !stopASAP {
syncDone <- err
}
syncDone <- err
}()

select {
@@ -212,22 +212,26 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
return nil
}

runJob := func(kill <-chan empty, jobDone chan<- empty) {
runJob := func(kill <-chan empty, jobDone chan<- empty, bypassSemaphore <-chan empty) {
select {
case semaphore <- empty{}:
defer func() { <-semaphore }()
runJobWrapper(kill, jobDone)
case <-bypassSemaphore:
logger.Noticef("Concurrent limit ignored by %s", m.Name())
runJobWrapper(kill, jobDone)
case <-kill:
jobDone <- empty{}
return
}
}

bypassSemaphore := make(chan empty, 1)
for {
if m.State() == stateReady {
kill := make(chan empty)
jobDone := make(chan empty)
go runJob(kill, jobDone)
go runJob(kill, jobDone, bypassSemaphore)

_wait_for_job:
select {
@@ -248,7 +252,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
m.SetState(stateReady)
close(kill)
<-jobDone
time.Sleep(time.Second) // Restart may fail if the process was not exited yet
continue
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobStart:
m.SetState(stateReady)
goto _wait_for_job
@@ -272,8 +283,14 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
case jobDisable:
m.SetState(stateDisabled)
return nil
case jobForceStart:
select { //non-blocking
default:
case bypassSemaphore <- empty{}:
}
fallthrough
case jobRestart:
m.SetState(stateReady)
fallthrough
case jobStart:
m.SetState(stateReady)
default:
233 changes: 233 additions & 0 deletions worker/job_test.go
Original file line number Diff line number Diff line change
@@ -135,6 +135,8 @@ echo $TUNASYNC_WORKING_DIR
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)

job.ctrlChan <- jobStart // should be ignored

job.ctrlChan <- jobStop

msg = <-managerChan
@@ -170,8 +172,239 @@ echo $TUNASYNC_WORKING_DIR
job.ctrlChan <- jobDisable
<-job.disabled
})

Convey("If we restart it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart

msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)

job.ctrlChan <- jobRestart

msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")

msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)

expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)

loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)
job.ctrlChan <- jobDisable
<-job.disabled
})

Convey("If we disable it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart

msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)

job.ctrlChan <- jobDisable

msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")

<-job.disabled
})

Convey("If we stop it twice, than start it", func(ctx C) {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart

msg := <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)

job.ctrlChan <- jobStop

msg = <-managerChan
So(msg.status, ShouldEqual, Failed)
So(msg.msg, ShouldEqual, "killed by manager")

job.ctrlChan <- jobStop // should be ignored

job.ctrlChan <- jobStart

msg = <-managerChan
So(msg.status, ShouldEqual, PreSyncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Syncing)
msg = <-managerChan
So(msg.status, ShouldEqual, Success)

expectedOutput := fmt.Sprintf(
"%s\n%s\n",
provider.WorkingDir(), provider.WorkingDir(),
)

loggedContent, err := ioutil.ReadFile(provider.LogFile())
So(err, ShouldBeNil)
So(string(loggedContent), ShouldEqual, expectedOutput)

job.ctrlChan <- jobDisable
<-job.disabled
})
})

})

}

func TestConcurrentMirrorJobs(t *testing.T) {

InitLogger(true, true, false)

Convey("Concurrent MirrorJobs should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)

const CONCURRENT = 5

var providers [CONCURRENT]*cmdProvider
var jobs [CONCURRENT]*mirrorJob
for i := 0; i < CONCURRENT; i++ {
c := cmdConfig{
name: fmt.Sprintf("job-%d", i),
upstreamURL: "http://mirrors.tuna.moe/",
command: "sleep 2",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 10 * time.Second,
}

var err error
providers[i], err = newCmdProvider(c)
So(err, ShouldBeNil)
jobs[i] = newMirrorJob(providers[i])
}

managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, CONCURRENT-2)

countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
counterEnded := 0
counterRunning := 0
peakConcurrent = 0
counterFailed = 0
for counterEnded < totalJobs {
msg := <-managerChan
switch msg.status {
case PreSyncing:
counterRunning++
case Syncing:
case Failed:
counterFailed++
fallthrough
case Success:
counterEnded++
counterRunning--
default:
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
if counterRunning > peakConcurrent {
peakConcurrent = counterRunning
}
}
// select {
// case msg := <-managerChan:
// logger.Errorf("extra message received: %v", msg)
// So(0, ShouldEqual, 1)
// case <-time.After(2 * time.Second):
// }
return
}

Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}

peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)

So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we cancel one job", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobRestart
time.Sleep(200 * time.Millisecond)
}

// Cancel the one waiting for semaphore
jobs[len(jobs)-1].ctrlChan <- jobStop

peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)

So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we override the concurrent limit", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(200 * time.Millisecond)
}

jobs[len(jobs)-1].ctrlChan <- jobForceStart
jobs[len(jobs)-2].ctrlChan <- jobForceStart

peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)

So(peakConcurrent, ShouldEqual, CONCURRENT)
So(counterFailed, ShouldEqual, 0)

time.Sleep(1 * time.Second)

// fmt.Println("Restart them")

for _, job := range jobs {
job.ctrlChan <- jobStart
}

peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)

So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
})
}
47 changes: 43 additions & 4 deletions worker/provider_test.go
Original file line number Diff line number Diff line change
@@ -79,11 +79,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)

targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+
@@ -144,11 +145,12 @@ exit 0
err = ioutil.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)

targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"%s %s -aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+
@@ -260,6 +262,40 @@ sleep 5

})
})
Convey("Command Provider without log file should work", t, func(ctx C) {
tmpDir, err := ioutil.TempDir("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)

c := cmdConfig{
name: "run-ls",
upstreamURL: "http://mirrors.tuna.moe/",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
interval: 600 * time.Second,
}

provider, err := newCmdProvider(c)
So(err, ShouldBeNil)

So(provider.IsMaster(), ShouldEqual, false)
So(provider.ZFS(), ShouldBeNil)
So(provider.Type(), ShouldEqual, provCommand)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)

Convey("Run the command", func() {

err = provider.Run()
So(err, ShouldBeNil)

})
})
}

func TestTwoStageRsyncProvider(t *testing.T) {
@@ -280,6 +316,8 @@ func TestTwoStageRsyncProvider(t *testing.T) {
logFile: tmpFile,
useIPv6: true,
excludeFile: tmpFile,
username: "hello",
password: "world",
}

provider, err := newTwoStageRsyncProvider(c)
@@ -306,21 +344,22 @@ exit 0
err = provider.Run()
So(err, ShouldBeNil)

targetDir, _ := filepath.EvalSymlinks(provider.WorkingDir())
expectedOutput := fmt.Sprintf(
"syncing to %s\n"+
"%s\n"+
"Done\n"+
"syncing to %s\n"+
"%s\n"+
"Done\n",
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ --safe-links "+
"--timeout=120 --contimeout=120 --exclude dists/ -6 "+
"--exclude-from %s %s %s",
provider.excludeFile, provider.upstreamURL, provider.WorkingDir(),
),
provider.WorkingDir(),
targetDir,
fmt.Sprintf(
"-aHvh --no-o --no-g --stats --exclude .~tmp~/ "+
"--delete --delete-after --delay-updates --safe-links "+
8 changes: 7 additions & 1 deletion worker/rsync_provider.go
Original file line number Diff line number Diff line change
@@ -81,6 +81,12 @@ func (p *rsyncProvider) Run() error {
}

func (p *rsyncProvider) Start() error {
p.Lock()
defer p.Unlock()

if p.IsRunning() {
return errors.New("provider is currently running")
}

env := map[string]string{}
if p.username != "" {
@@ -94,7 +100,7 @@ func (p *rsyncProvider) Start() error {
command = append(command, p.upstreamURL, p.WorkingDir())

p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(false); err != nil {
return err
}

3 changes: 3 additions & 0 deletions worker/runner.go
Original file line number Diff line number Diff line change
@@ -118,6 +118,9 @@ func (c *cmdJob) Wait() error {
return c.retErr
default:
err := c.cmd.Wait()
if c.cmd.Stdout != nil {
c.cmd.Stdout.(*os.File).Close()
}
c.retErr = err
close(c.finished)
return err
15 changes: 11 additions & 4 deletions worker/two_stage_rsync_provider.go
Original file line number Diff line number Diff line change
@@ -108,7 +108,12 @@ func (p *twoStageRsyncProvider) Options(stage int) ([]string, error) {
}

func (p *twoStageRsyncProvider) Run() error {
defer p.Wait()
p.Lock()
defer p.Unlock()

if p.IsRunning() {
return errors.New("provider is currently running")
}

env := map[string]string{}
if p.username != "" {
@@ -129,17 +134,19 @@ func (p *twoStageRsyncProvider) Run() error {
command = append(command, p.upstreamURL, p.WorkingDir())

p.cmd = newCmdJob(p, command, p.WorkingDir(), env)
if err := p.prepareLogFile(); err != nil {
if err := p.prepareLogFile(stage > 1); err != nil {
return err
}

if err = p.cmd.Start(); err != nil {
return err
}
p.isRunning.Store(true)
logger.Debugf("set isRunning to true: %s", p.Name())

err = p.cmd.Wait()
p.isRunning.Store(false)
p.Unlock()
err = p.Wait()
p.Lock()
if err != nil {
return err
}
6 changes: 5 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
@@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
}
switch cmd.Cmd {
case CmdStart:
job.ctrlChan <- jobStart
if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart:
job.ctrlChan <- jobRestart
case CmdStop:

0 comments on commit 628266a

Please sign in to comment.