Skip to content

Commit

Permalink
new feature: run "tunasynctl start" with "-f" to override concurrent …
Browse files Browse the repository at this point in the history
…job limit
  • Loading branch information
z4yx committed May 30, 2018
1 parent 6cbe91b commit c750aa1
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 50 deletions.
12 changes: 11 additions & 1 deletion cmd/tunasynctl/tunasynctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,16 @@ func cmdJob(cmd tunasync.CmdVerb) cli.ActionFunc {
"argument WORKER", 1)
}

options := map[string]bool{}
if c.Bool("force") {
options["force"] = true
}
cmd := tunasync.ClientCmd{
Cmd: cmd,
MirrorID: mirrorID,
WorkerID: c.String("worker"),
Args: argsList,
Options: options,
}
resp, err := tunasync.PostJSON(baseURL+cmdPath, cmd, client)
if err != nil {
Expand Down Expand Up @@ -410,6 +415,11 @@ func main() {
},
}

forceStartFlag := cli.BoolFlag{
Name: "force, f",
Usage: "Override the concurrent limit",
}

app.Commands = []cli.Command{
{
Name: "list",
Expand Down Expand Up @@ -450,7 +460,7 @@ func main() {
{
Name: "start",
Usage: "Start a job",
Flags: append(commonFlags, cmdFlags...),
Flags: append(append(commonFlags, cmdFlags...), forceStartFlag),
Action: initializeWrapper(cmdJob(tunasync.CmdStart)),
},
{
Expand Down
16 changes: 9 additions & 7 deletions internal/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func (c CmdVerb) String() string {
// A WorkerCmd is the command message send from the
// manager to a worker
type WorkerCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}

func (c WorkerCmd) String() string {
Expand All @@ -83,8 +84,9 @@ func (c WorkerCmd) String() string {
// A ClientCmd is the command message send from client
// to the manager
type ClientCmd struct {
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Cmd CmdVerb `json:"cmd"`
MirrorID string `json:"mirror_id"`
WorkerID string `json:"worker_id"`
Args []string `json:"args"`
Options map[string]bool `json:"options"`
}
1 change: 1 addition & 0 deletions manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
Cmd: clientCmd.Cmd,
MirrorID: clientCmd.MirrorID,
Args: clientCmd.Args,
Options: clientCmd.Options,
}

// update job status, even if the job did not disable successfully,
Expand Down
1 change: 1 addition & 0 deletions worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (m *mirrorJob) Run(managerChan chan<- jobMessage, semaphore chan empty) err
defer func() { <-semaphore }()
runJobWrapper(kill, jobDone)
case <-bypassSemaphore:
logger.Noticef("Concurrent limit ignored by %s", m.Name())
runJobWrapper(kill, jobDone)
case <-kill:
jobDone <- empty{}
Expand Down
98 changes: 59 additions & 39 deletions worker/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestConcurrentMirrorJobs(t *testing.T) {
c := cmdConfig{
name: fmt.Sprintf("job-%d", i),
upstreamURL: "http://mirrors.tuna.moe/",
command: "sleep 3",
command: "sleep 2",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
Expand All @@ -302,17 +302,12 @@ func TestConcurrentMirrorJobs(t *testing.T) {
managerChan := make(chan jobMessage, 10)
semaphore := make(chan empty, CONCURRENT-2)

Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}

countingJobs := func(managerChan chan jobMessage, totalJobs, concurrentCheck int) (peakConcurrent, counterFailed int) {
counterEnded := 0
counterRunning := 0
maxRunning := 0
counterFailed := 0
for counterEnded < CONCURRENT {
peakConcurrent = 0
counterFailed = 0
for counterEnded < totalJobs {
msg := <-managerChan
switch msg.status {
case PreSyncing:
Expand All @@ -328,13 +323,29 @@ func TestConcurrentMirrorJobs(t *testing.T) {
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2)
if counterRunning > maxRunning {
maxRunning = counterRunning
So(counterRunning, ShouldBeLessThanOrEqualTo, concurrentCheck)
if counterRunning > peakConcurrent {
peakConcurrent = counterRunning
}
}
// select {
// case msg := <-managerChan:
// logger.Errorf("extra message received: %v", msg)
// So(0, ShouldEqual, 1)
// case <-time.After(2 * time.Second):
// }
return
}

Convey("When we run them all", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
}

peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT-2)

So(maxRunning, ShouldEqual, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
Expand All @@ -352,33 +363,42 @@ func TestConcurrentMirrorJobs(t *testing.T) {
// Cancel the one waiting for semaphore
jobs[len(jobs)-1].ctrlChan <- jobStop

counterEnded := 0
counterRunning := 0
maxRunning := 0
counterFailed := 0
for counterEnded < CONCURRENT-1 {
msg := <-managerChan
switch msg.status {
case PreSyncing:
counterRunning++
case Syncing:
case Failed:
counterFailed++
fallthrough
case Success:
counterEnded++
counterRunning--
default:
So(0, ShouldEqual, 1)
}
// Test if semaphore works
So(counterRunning, ShouldBeLessThanOrEqualTo, CONCURRENT-2)
if counterRunning > maxRunning {
maxRunning = counterRunning
}
peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT-1, CONCURRENT-2)

So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
job.ctrlChan <- jobDisable
<-job.disabled
}
})
Convey("If we override the concurrent limit", func(ctx C) {
for _, job := range jobs {
go job.Run(managerChan, semaphore)
job.ctrlChan <- jobStart
time.Sleep(200 * time.Millisecond)
}

jobs[len(jobs)-1].ctrlChan <- jobForceStart
jobs[len(jobs)-2].ctrlChan <- jobForceStart

peakConcurrent, counterFailed := countingJobs(managerChan, CONCURRENT, CONCURRENT)

So(peakConcurrent, ShouldEqual, CONCURRENT)
So(counterFailed, ShouldEqual, 0)

time.Sleep(1 * time.Second)

// fmt.Println("Restart them")

for _, job := range jobs {
job.ctrlChan <- jobStart
}

peakConcurrent, counterFailed = countingJobs(managerChan, CONCURRENT, CONCURRENT-2)

So(maxRunning, ShouldEqual, CONCURRENT-2)
So(peakConcurrent, ShouldEqual, CONCURRENT-2)
So(counterFailed, ShouldEqual, 0)

for _, job := range jobs {
Expand Down
4 changes: 2 additions & 2 deletions worker/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ sleep 5
So(err, ShouldBeNil)

c := cmdConfig{
name: "run-pwd",
name: "run-ls",
upstreamURL: "http://mirrors.tuna.moe/",
command: "pwd",
command: "ls",
workingDir: tmpDir,
logDir: tmpDir,
logFile: "/dev/null",
Expand Down
6 changes: 5 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ func (w *Worker) makeHTTPServer() {
}
switch cmd.Cmd {
case CmdStart:
job.ctrlChan <- jobStart
if cmd.Options["force"] {
job.ctrlChan <- jobForceStart
} else {
job.ctrlChan <- jobStart
}
case CmdRestart:
job.ctrlChan <- jobRestart
case CmdStop:
Expand Down

0 comments on commit c750aa1

Please sign in to comment.