Skip to content

Commit

Permalink
refactor: manager server
Browse files Browse the repository at this point in the history
  • Loading branch information
bigeagle committed Apr 30, 2016
1 parent 84b7bdd commit 9865f28
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 54 deletions.
2 changes: 1 addition & 1 deletion manager/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func contextErrorLogger(c *gin.Context) {
c.Next()
}

func (s *managerServer) workerIDValidator(c *gin.Context) {
func (s *Manager) workerIDValidator(c *gin.Context) {
workerID := c.Param("id")
_, err := s.adapter.GetWorker(workerID)
if err != nil {
Expand Down
148 changes: 97 additions & 51 deletions manager/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package manager

import (
"crypto/tls"
"fmt"
"net/http"

Expand All @@ -14,13 +15,99 @@ const (
_infoKey = "message"
)

type managerServer struct {
*gin.Engine
adapter dbAdapter
var manager *Manager

// A Manager represents a manager server
type Manager struct {
cfg *Config
engine *gin.Engine
adapter dbAdapter
tlsConfig *tls.Config
}

// GetTUNASyncManager returns the manager from config
func GetTUNASyncManager(cfg *Config) *Manager {
if manager != nil {
return manager
}

// create gin engine
if !cfg.Debug {
gin.SetMode(gin.ReleaseMode)
}
s := &Manager{
cfg: cfg,
engine: gin.Default(),
adapter: nil,
tlsConfig: nil,
}

if cfg.Files.CACert != "" {
tlsConfig, err := GetTLSConfig(cfg.Files.CACert)
if err != nil {
logger.Error("Error initializing TLS config: %s", err.Error())
return nil
}
s.tlsConfig = tlsConfig
}

if cfg.Files.DBFile != "" {
adapter, err := makeDBAdapter(cfg.Files.DBType, cfg.Files.DBFile)
if err != nil {
logger.Error("Error initializing DB adapter: %s", err.Error())
return nil
}
s.setDBAdapter(adapter)
}

// common log middleware
s.engine.Use(contextErrorLogger)

s.engine.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
})
// list jobs, status page
s.engine.GET("/jobs", s.listAllJobs)

// list workers
s.engine.GET("/workers", s.listWorkers)
// worker online
s.engine.POST("/workers", s.registerWorker)

// workerID should be valid in this route group
workerValidateGroup := s.engine.Group("/workers", s.workerIDValidator)
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)

// for tunasynctl to post commands
s.engine.POST("/cmd", s.handleClientCmd)

manager = s
return s
}

func (s *Manager) setDBAdapter(adapter dbAdapter) {
s.adapter = adapter
}

// Run runs the manager server forever
func (s *Manager) Run() {
addr := fmt.Sprintf("%s:%d", s.cfg.Server.Addr, s.cfg.Server.Port)
if s.cfg.Server.SSLCert == "" && s.cfg.Server.SSLKey == "" {
if err := s.engine.Run(addr); err != nil {
panic(err)
}
} else {
if err := s.engine.RunTLS(addr, s.cfg.Server.SSLCert, s.cfg.Server.SSLKey); err != nil {
panic(err)
}
}
}

// listAllJobs repond with all jobs of specified workers
func (s *managerServer) listAllJobs(c *gin.Context) {
func (s *Manager) listAllJobs(c *gin.Context) {
mirrorStatusList, err := s.adapter.ListAllMirrorStatus()
if err != nil {
err := fmt.Errorf("failed to list all mirror status: %s",
Expand All @@ -41,7 +128,7 @@ func (s *managerServer) listAllJobs(c *gin.Context) {
}

// listWrokers respond with informations of all the workers
func (s *managerServer) listWorkers(c *gin.Context) {
func (s *Manager) listWorkers(c *gin.Context) {
var workerInfos []WorkerStatus
workers, err := s.adapter.ListWorkers()
if err != nil {
Expand All @@ -63,7 +150,7 @@ func (s *managerServer) listWorkers(c *gin.Context) {
}

// registerWorker register an newly-online worker
func (s *managerServer) registerWorker(c *gin.Context) {
func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus
c.BindJSON(&_worker)
newWorker, err := s.adapter.CreateWorker(_worker)
Expand All @@ -80,7 +167,7 @@ func (s *managerServer) registerWorker(c *gin.Context) {
}

// listJobsOfWorker respond with all the jobs of the specified worker
func (s *managerServer) listJobsOfWorker(c *gin.Context) {
func (s *Manager) listJobsOfWorker(c *gin.Context) {
workerID := c.Param("id")
mirrorStatusList, err := s.adapter.ListMirrorStatus(workerID)
if err != nil {
Expand All @@ -94,13 +181,13 @@ func (s *managerServer) listJobsOfWorker(c *gin.Context) {
c.JSON(http.StatusOK, mirrorStatusList)
}

func (s *managerServer) returnErrJSON(c *gin.Context, code int, err error) {
func (s *Manager) returnErrJSON(c *gin.Context, code int, err error) {
c.JSON(code, gin.H{
_errorKey: err.Error(),
})
}

func (s *managerServer) updateJobOfWorker(c *gin.Context) {
func (s *Manager) updateJobOfWorker(c *gin.Context) {
workerID := c.Param("id")
var status MirrorStatus
c.BindJSON(&status)
Expand All @@ -117,7 +204,7 @@ func (s *managerServer) updateJobOfWorker(c *gin.Context) {
c.JSON(http.StatusOK, newStatus)
}

func (s *managerServer) handleClientCmd(c *gin.Context) {
func (s *Manager) handleClientCmd(c *gin.Context) {
var clientCmd ClientCmd
c.BindJSON(&clientCmd)
workerID := clientCmd.WorkerID
Expand Down Expand Up @@ -153,44 +240,3 @@ func (s *managerServer) handleClientCmd(c *gin.Context) {
// TODO: check response for success
c.JSON(http.StatusOK, gin.H{_infoKey: "successfully send command to worker " + workerID})
}

func (s *managerServer) setDBAdapter(adapter dbAdapter) {
s.adapter = adapter
}

func makeHTTPServer(debug bool) *managerServer {
// create gin engine
if !debug {
gin.SetMode(gin.ReleaseMode)
}
s := &managerServer{
gin.Default(),
nil,
}

// common log middleware
s.Use(contextErrorLogger)

s.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{_infoKey: "pong"})
})
// list jobs, status page
s.GET("/jobs", s.listAllJobs)

// list workers
s.GET("/workers", s.listWorkers)
// worker online
s.POST("/workers", s.registerWorker)

// workerID should be valid in this route group
workerValidateGroup := s.Group("/workers", s.workerIDValidator)
// get job list
workerValidateGroup.GET(":id/jobs", s.listJobsOfWorker)
// post job status
workerValidateGroup.POST(":id/jobs/:job", s.updateJobOfWorker)

// for tunasynctl to post commands
s.POST("/cmd", s.handleClientCmd)

return s
}
4 changes: 2 additions & 2 deletions manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
func TestHTTPServer(t *testing.T) {
Convey("HTTP server should work", t, func(ctx C) {
InitLogger(true, true, false)
s := makeHTTPServer(false)
s := GetTUNASyncManager(&Config{Debug: false})
So(s, ShouldNotBeNil)
s.setDBAdapter(&mockDBAdapter{
workerStore: map[string]WorkerStatus{
Expand All @@ -35,7 +35,7 @@ func TestHTTPServer(t *testing.T) {
port := rand.Intn(10000) + 20000
baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
go func() {
s.Run(fmt.Sprintf("127.0.0.1:%d", port))
s.engine.Run(fmt.Sprintf("127.0.0.1:%d", port))
}()
time.Sleep(50 * time.Microsecond)
resp, err := http.Get(baseURL + "/ping")
Expand Down

0 comments on commit 9865f28

Please sign in to comment.