Skip to content

Commit

Permalink
Merge pull request tuna#82 from tuna/dev
Browse files Browse the repository at this point in the history
New feature: remove a worker with tunasynctl
  • Loading branch information
bigeagle authored May 31, 2018
2 parents 628266a + 7e601d9 commit ca106f1
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 3 deletions.
58 changes: 58 additions & 0 deletions cmd/tunasynctl/tunasynctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,52 @@ func updateMirrorSize(c *cli.Context) error {
return nil
}

func removeWorker(c *cli.Context) error {
args := c.Args()
if len(args) != 0 {
return cli.NewExitError("Usage: tunasynctl -w <worker-id>", 1)
}
workerID := c.String("worker")
if len(workerID) == 0 {
return cli.NewExitError("Please specify the <worker-id>", 1)
}
url := fmt.Sprintf("%s/workers/%s", baseURL, workerID)

req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
logger.Panicf("Invalid HTTP Request: %s", err.Error())
}
resp, err := client.Do(req)

if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return cli.NewExitError(
fmt.Sprintf("Failed to parse response: %s", err.Error()),
1)
}

return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+
" command: HTTP status code is not 200: %s", body),
1)
}

res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
if res["message"] == "deleted" {
logger.Info("Successfully removed the worker")
} else {
logger.Info("Failed to remove the worker")
}
return nil
}

func flushDisabledJobs(c *cli.Context) error {
req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil)
if err != nil {
Expand Down Expand Up @@ -445,6 +491,18 @@ func main() {
Flags: commonFlags,
Action: initializeWrapper(listWorkers),
},
{
Name: "rm-worker",
Usage: "Remove a worker",
Flags: append(
commonFlags,
cli.StringFlag{
Name: "worker, w",
Usage: "worker-id of the worker to be removed",
},
),
Action: initializeWrapper(removeWorker),
},
{
Name: "set-size",
Usage: "Set mirror size",
Expand Down
14 changes: 14 additions & 0 deletions manager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type dbAdapter interface {
Init() error
ListWorkers() ([]WorkerStatus, error)
GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
Expand Down Expand Up @@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) {
return
}

func (b *boltAdapter) DeleteWorker(workerID string) (err error) {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
v := bucket.Get([]byte(workerID))
if v == nil {
return fmt.Errorf("invalid workerID %s", workerID)
}
err := bucket.Delete([]byte(workerID))
return err
})
return
}

func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_workerBucketKey))
Expand Down
24 changes: 21 additions & 3 deletions manager/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) {
So(err, ShouldBeNil)
}

Convey("get exists worker", func() {
Convey("get existent worker", func() {
_, err := boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
})

Convey("list exist worker", func() {
Convey("list existent workers", func() {
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})

Convey("get inexist worker", func() {
Convey("get non-existent worker", func() {
_, err := boltDB.GetWorker("invalid workerID")
So(err, ShouldNotBeNil)
})

Convey("delete existent worker", func() {
err := boltDB.DeleteWorker(testWorkerIDs[0])
So(err, ShouldBeNil)
_, err = boltDB.GetWorker(testWorkerIDs[0])
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 1)
})

Convey("delete non-existent worker", func() {
err := boltDB.DeleteWorker("invalid workerID")
So(err, ShouldNotBeNil)
ws, err := boltDB.ListWorkers()
So(err, ShouldBeNil)
So(len(ws), ShouldEqual, 2)
})
})

Convey("update mirror status", func() {
Expand Down
18 changes: 18 additions & 0 deletions manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func GetTUNASyncManager(cfg *Config) *Manager {
// workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
{
// delete specified worker
workerValidateGroup.DELETE(":id", s.deleteWorker)
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
Expand Down Expand Up @@ -159,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"})
}

// deleteWorker deletes one worker by id
func (s *Manager) deleteWorker(c *gin.Context) {
workerID := c.Param("id")
err := s.adapter.DeleteWorker(workerID)
if err != nil {
err := fmt.Errorf("failed to delete worker: %s",
err.Error(),
)
c.Error(err)
s.returnErrJSON(c, http.StatusInternalServerError, err)
return
}
logger.Noticef("Worker <%s> deleted", workerID)
c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"})
}

// listWrokers respond with informations of all the workers
func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus
Expand Down
32 changes: 32 additions & 0 deletions manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ func TestHTTPServer(t *testing.T) {
So(len(actualResponseObj), ShouldEqual, 2)
})

Convey("delete an existent worker", func(ctx C) {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_infoKey], ShouldEqual, "deleted")
})

Convey("delete non-existent worker", func(ctx C) {
invalidWorker := "test_worker233"
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil)
So(err, ShouldBeNil)
clt := &http.Client{}
resp, err := clt.Do(req)
So(err, ShouldBeNil)
defer resp.Body.Close()
res := map[string]string{}
err = json.NewDecoder(resp.Body).Decode(&res)
So(err, ShouldBeNil)
So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker)
})

Convey("flush disabled jobs", func(ctx C) {
req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -323,6 +350,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) {
return w, nil
}

func (b *mockDBAdapter) DeleteWorker(workerID string) error {
delete(b.workerStore, workerID)
return nil
}

func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
// _, ok := b.workerStore[w.ID]
// if ok {
Expand Down

0 comments on commit ca106f1

Please sign in to comment.