diff --git a/cmd/tunasynctl/tunasynctl.go b/cmd/tunasynctl/tunasynctl.go index 3e2d6fe..52ce6f8 100644 --- a/cmd/tunasynctl/tunasynctl.go +++ b/cmd/tunasynctl/tunasynctl.go @@ -236,6 +236,52 @@ func updateMirrorSize(c *cli.Context) error { return nil } +func removeWorker(c *cli.Context) error { + args := c.Args() + if len(args) != 0 { + return cli.NewExitError("Usage: tunasynctl -w ", 1) + } + workerID := c.String("worker") + if len(workerID) == 0 { + return cli.NewExitError("Please specify the ", 1) + } + url := fmt.Sprintf("%s/workers/%s", baseURL, workerID) + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + logger.Panicf("Invalid HTTP Request: %s", err.Error()) + } + resp, err := client.Do(req) + + if err != nil { + return cli.NewExitError( + fmt.Sprintf("Failed to send request to manager: %s", err.Error()), 1) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return cli.NewExitError( + fmt.Sprintf("Failed to parse response: %s", err.Error()), + 1) + } + + return cli.NewExitError(fmt.Sprintf("Failed to correctly send"+ + " command: HTTP status code is not 200: %s", body), + 1) + } + + res := map[string]string{} + err = json.NewDecoder(resp.Body).Decode(&res) + if res["message"] == "deleted" { + logger.Info("Successfully removed the worker") + } else { + logger.Info("Failed to remove the worker") + } + return nil +} + func flushDisabledJobs(c *cli.Context) error { req, err := http.NewRequest("DELETE", baseURL+flushDisabledPath, nil) if err != nil { @@ -445,6 +491,18 @@ func main() { Flags: commonFlags, Action: initializeWrapper(listWorkers), }, + { + Name: "rm-worker", + Usage: "Remove a worker", + Flags: append( + commonFlags, + cli.StringFlag{ + Name: "worker, w", + Usage: "worker-id of the worker to be removed", + }, + ), + Action: initializeWrapper(removeWorker), + }, { Name: "set-size", Usage: "Set mirror size", diff --git a/manager/db.go b/manager/db.go index a88f26e..982f701 100644 --- a/manager/db.go +++ b/manager/db.go @@ -14,6 +14,7 @@ type dbAdapter interface { Init() error ListWorkers() ([]WorkerStatus, error) GetWorker(workerID string) (WorkerStatus, error) + DeleteWorker(workerID string) error CreateWorker(w WorkerStatus) (WorkerStatus, error) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) @@ -95,6 +96,19 @@ func (b *boltAdapter) GetWorker(workerID string) (w WorkerStatus, err error) { return } +func (b *boltAdapter) DeleteWorker(workerID string) (err error) { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(_workerBucketKey)) + v := bucket.Get([]byte(workerID)) + if v == nil { + return fmt.Errorf("invalid workerID %s", workerID) + } + err := bucket.Delete([]byte(workerID)) + return err + }) + return +} + func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { err := b.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(_workerBucketKey)) diff --git a/manager/db_test.go b/manager/db_test.go index 5e6e79d..1abc108 100644 --- a/manager/db_test.go +++ b/manager/db_test.go @@ -40,21 +40,39 @@ func TestBoltAdapter(t *testing.T) { So(err, ShouldBeNil) } - Convey("get exists worker", func() { + Convey("get existent worker", func() { _, err := boltDB.GetWorker(testWorkerIDs[0]) So(err, ShouldBeNil) }) - Convey("list exist worker", func() { + Convey("list existent workers", func() { ws, err := boltDB.ListWorkers() So(err, ShouldBeNil) So(len(ws), ShouldEqual, 2) }) - Convey("get inexist worker", func() { + Convey("get non-existent worker", func() { _, err := boltDB.GetWorker("invalid workerID") So(err, ShouldNotBeNil) }) + + Convey("delete existent worker", func() { + err := boltDB.DeleteWorker(testWorkerIDs[0]) + So(err, ShouldBeNil) + _, err = boltDB.GetWorker(testWorkerIDs[0]) + So(err, ShouldNotBeNil) + ws, err := boltDB.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 1) + }) + + Convey("delete non-existent worker", func() { + err := boltDB.DeleteWorker("invalid workerID") + So(err, ShouldNotBeNil) + ws, err := boltDB.ListWorkers() + So(err, ShouldBeNil) + So(len(ws), ShouldEqual, 2) + }) }) Convey("update mirror status", func() { diff --git a/manager/server.go b/manager/server.go index 2605446..56ac986 100644 --- a/manager/server.go +++ b/manager/server.go @@ -84,6 +84,8 @@ func GetTUNASyncManager(cfg *Config) *Manager { // workerID should be valid in this route group workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator) { + // delete specified worker + workerValidateGroup.DELETE(":id", s.deleteWorker) // get job list workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker) // post job status @@ -159,6 +161,22 @@ func (s *Manager) flushDisabledJobs(c *gin.Context) { c.JSON(http.StatusOK, gin.H{_infoKey: "flushed"}) } +// deleteWorker deletes one worker by id +func (s *Manager) deleteWorker(c *gin.Context) { + workerID := c.Param("id") + err := s.adapter.DeleteWorker(workerID) + if err != nil { + err := fmt.Errorf("failed to delete worker: %s", + err.Error(), + ) + c.Error(err) + s.returnErrJSON(c, http.StatusInternalServerError, err) + return + } + logger.Noticef("Worker <%s> deleted", workerID) + c.JSON(http.StatusOK, gin.H{_infoKey: "deleted"}) +} + // listWrokers respond with informations of all the workers func (s *Manager) listWorkers(c *gin.Context) { var workerInfos []WorkerStatus diff --git a/manager/server_test.go b/manager/server_test.go index 27f3cd4..73b2454 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -79,6 +79,33 @@ func TestHTTPServer(t *testing.T) { So(len(actualResponseObj), ShouldEqual, 2) }) + Convey("delete an existent worker", func(ctx C) { + req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, w.ID), nil) + So(err, ShouldBeNil) + clt := &http.Client{} + resp, err := clt.Do(req) + So(err, ShouldBeNil) + defer resp.Body.Close() + res := map[string]string{} + err = json.NewDecoder(resp.Body).Decode(&res) + So(err, ShouldBeNil) + So(res[_infoKey], ShouldEqual, "deleted") + }) + + Convey("delete non-existent worker", func(ctx C) { + invalidWorker := "test_worker233" + req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/workers/%s", baseURL, invalidWorker), nil) + So(err, ShouldBeNil) + clt := &http.Client{} + resp, err := clt.Do(req) + So(err, ShouldBeNil) + defer resp.Body.Close() + res := map[string]string{} + err = json.NewDecoder(resp.Body).Decode(&res) + So(err, ShouldBeNil) + So(res[_errorKey], ShouldEqual, "invalid workerID "+invalidWorker) + }) + Convey("flush disabled jobs", func(ctx C) { req, err := http.NewRequest("DELETE", baseURL+"/jobs/disabled", nil) So(err, ShouldBeNil) @@ -323,6 +350,11 @@ func (b *mockDBAdapter) GetWorker(workerID string) (WorkerStatus, error) { return w, nil } +func (b *mockDBAdapter) DeleteWorker(workerID string) error { + delete(b.workerStore, workerID) + return nil +} + func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) { // _, ok := b.workerStore[w.ID] // if ok {