diff --git a/driver/scheduler/scheduler_fetch.go b/driver/scheduler/scheduler_fetch.go index cb20bfa..3bf6e58 100644 --- a/driver/scheduler/scheduler_fetch.go +++ b/driver/scheduler/scheduler_fetch.go @@ -32,6 +32,7 @@ func (s *Scheduler) Fetch(demands []market.Demand) { result, err := Assign(s.Leader, &request) if err != nil { log.Printf("%s Failed to allocate: %v", s.Leader, err) + time.Sleep(time.Millisecond * time.Duration(15000+rand.Int63n(5000))) } else { if len(result.Allocations) == 0 { log.Printf("%s Failed to allocate any server.", s.Leader) @@ -77,7 +78,7 @@ func Assign(leader string, request *resource.AllocationRequest) (*resource.Alloc return nil, fmt.Errorf("/agent/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob)) } if ret.Error != "" { - return nil, fmt.Errorf("/agent/assign error:%v, json:%s", ret.Error, string(jsonBlob)) + return nil, fmt.Errorf("/agent/assign error:%v", ret.Error) } return &ret, nil } diff --git a/resource/resource.go b/resource/resource.go index fac9442..5cd6a9b 100644 --- a/resource/resource.go +++ b/resource/resource.go @@ -1,6 +1,7 @@ package resource import ( + "fmt" "net/http" "net/url" "strconv" @@ -12,6 +13,10 @@ type ComputeResource struct { MemoryMB int64 `json:"memoryMB,omitempty"` } +func (a ComputeResource) String() string { + return fmt.Sprintf("CPUCount %d Level %d Memory %d MB", a.CPUCount, a.CPULevel, a.MemoryMB) +} + func (a ComputeResource) Minus(b ComputeResource) ComputeResource { return ComputeResource{ CPUCount: a.CPUCount - b.CPUCount, diff --git a/resource/service_discovery/leader/handler_agents.go b/resource/service_discovery/leader/handler_agents.go index 0bfb368..2229fb7 100644 --- a/resource/service_discovery/leader/handler_agents.go +++ b/resource/service_discovery/leader/handler_agents.go @@ -11,51 +11,51 @@ import ( "strings" "github.com/chrislusf/glow/resource" - "github.com/labstack/echo" + "github.com/chrislusf/glow/util" ) -func (tl *TeamLeader) listAgentsHandler(c *echo.Context) error { - c.JSON(http.StatusAccepted, tl.LeaderResource.Topology) - return nil +func (tl *TeamLeader) listAgentsHandler(w http.ResponseWriter, r *http.Request) { + util.Json(w, r, http.StatusAccepted, tl.LeaderResource.Topology) } -func (tl *TeamLeader) requestAgentHandler(c *echo.Context) error { - requestBlob := []byte(c.Request().FormValue("request")) +func (tl *TeamLeader) requestAgentHandler(w http.ResponseWriter, r *http.Request) { + requestBlob := []byte(r.FormValue("request")) var request resource.AllocationRequest err := json.Unmarshal(requestBlob, &request) if err != nil { - return fmt.Errorf("request JSON unmarshal error:%v, json:%s", err, string(requestBlob)) + util.Error(w, r, http.StatusBadRequest, fmt.Sprintf("request JSON unmarshal error:%v, json:%s", err, string(requestBlob))) + return } - fmt.Printf("request:\n%+v\n", request) + // fmt.Printf("request:\n%+v\n", request) result := tl.allocate(&request) - fmt.Printf("result: %v\n%+v\n", result.Error, result.Allocations) + // fmt.Printf("result: %v\n%+v\n", result.Error, result.Allocations) if result.Error != "" { - c.JSON(http.StatusNotFound, result) + util.Json(w, r, http.StatusNotFound, result) + return } - c.JSON(http.StatusAccepted, result) + util.Json(w, r, http.StatusAccepted, result) - return nil } -func (tl *TeamLeader) updateAgentHandler(c *echo.Context) error { - servicePortString := c.Request().FormValue("servicePort") +func (tl *TeamLeader) updateAgentHandler(w http.ResponseWriter, r *http.Request) { + servicePortString := r.FormValue("servicePort") servicePort, err := strconv.Atoi(servicePortString) if err != nil { log.Printf("Strange: servicePort not found: %s, %v", servicePortString, err) } - host := c.Request().Host + host := r.Host if strings.Contains(host, ":") { host = host[0:strings.Index(host, ":")] } // println("received agent update from", host+":"+servicePort) - res, alloc := resource.NewComputeResourceFromRequest(c.Request()) + res, alloc := resource.NewComputeResourceFromRequest(r) ai := &resource.AgentInformation{ Location: resource.Location{ - DataCenter: c.Request().FormValue("dataCenter"), - Rack: c.Request().FormValue("rack"), + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), Server: host, Port: servicePort, }, @@ -67,7 +67,5 @@ func (tl *TeamLeader) updateAgentHandler(c *echo.Context) error { tl.LeaderResource.UpdateAgentInformation(ai) - c.NoContent(http.StatusAccepted) - - return nil + w.WriteHeader(http.StatusAccepted) } diff --git a/resource/service_discovery/leader/handler_channels.go b/resource/service_discovery/leader/handler_channels.go index 9c9a18b..adb7478 100644 --- a/resource/service_discovery/leader/handler_channels.go +++ b/resource/service_discovery/leader/handler_channels.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/labstack/echo" + "github.com/chrislusf/glow/util" ) type ChannelInformation struct { @@ -15,14 +15,22 @@ type ChannelInformation struct { LastHeartBeat time.Time } -func (tl *TeamLeader) listChannelsHandler(c *echo.Context) error { - path := c.P(0) +func (tl *TeamLeader) handleChannel(w http.ResponseWriter, r *http.Request) { + if r.Method == "POST" { + tl.updateChannelHandler(w, r) + } else { + tl.listChannelsHandler(w, r) + } +} + +func (tl *TeamLeader) listChannelsHandler(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path[len("/channel/"):] freshChannels := make([]*ChannelInformation, 0) rps, ok := tl.channels[path] if !ok { - c.JSON(http.StatusOK, freshChannels) - return nil + util.Json(w, r, http.StatusOK, freshChannels) + return } for _, rp := range rps { if rp.LastHeartBeat.Add(TimeOutLimit * time.Second).After(time.Now()) { @@ -35,19 +43,18 @@ func (tl *TeamLeader) listChannelsHandler(c *echo.Context) error { tl.channelsLock.Lock() tl.channels[path] = freshChannels tl.channelsLock.Unlock() - c.JSON(http.StatusOK, freshChannels) - return nil + util.Json(w, r, http.StatusOK, freshChannels) } // put agent information list under a path -func (tl *TeamLeader) updateChannelHandler(c *echo.Context) error { - servicePort := c.Request().FormValue("servicePort") - host := c.Request().Host +func (tl *TeamLeader) updateChannelHandler(w http.ResponseWriter, r *http.Request) { + servicePort := r.FormValue("servicePort") + host := r.Host if strings.Contains(host, ":") { host = host[0:strings.Index(host, ":")] } location := host + ":" + servicePort - path := c.P(0) + path := r.URL.Path[len("/channel/"):] // println(path, ":", location) rps, ok := tl.channels[path] @@ -72,10 +79,6 @@ func (tl *TeamLeader) updateChannelHandler(c *echo.Context) error { tl.channels[path] = rps tl.channelsLock.Unlock() - c.JSON(http.StatusAccepted, tl.channels) + util.Json(w, r, http.StatusAccepted, tl.channels) - // id, _ := strconv.Atoi(c.Param("url")) - // utils.WriteJson(c, http.StatusOK, infos) - // c.String(http.StatusOK, c.P(0)+" runs at "+location.String()+".") - return nil } diff --git a/resource/service_discovery/leader/leader.go b/resource/service_discovery/leader/leader.go index f749f75..92a964c 100644 --- a/resource/service_discovery/leader/leader.go +++ b/resource/service_discovery/leader/leader.go @@ -6,7 +6,7 @@ import ( "net/http" "sync" - "github.com/labstack/echo" + "github.com/chrislusf/glow/util" ) type TeamLeader struct { @@ -16,12 +16,10 @@ type TeamLeader struct { LeaderResource *LeaderResource } -func (tl *TeamLeader) statusHandler(c *echo.Context) error { +func (tl *TeamLeader) statusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) infos["Version"] = "0.001" - // utils.WriteJson(c, http.StatusOK, infos) - c.String(http.StatusOK, "Hello World") - return nil + util.Json(w, r, http.StatusOK, infos) } func RunLeader(listenOn string) { @@ -29,12 +27,12 @@ func RunLeader(listenOn string) { tl.channels = make(map[string][]*ChannelInformation) tl.LeaderResource = NewLeaderResource() - e := echo.New() - e.Get("/", tl.statusHandler) - e.Post("/agent/assign", tl.requestAgentHandler) - e.Post("/agent/update", tl.updateAgentHandler) - e.Get("/agent/*", tl.listAgentsHandler) - e.Post("/channel/*", tl.updateChannelHandler) - e.Get("/channel/*", tl.listChannelsHandler) - e.Run(listenOn) + http.HandleFunc("/", tl.statusHandler) + http.HandleFunc("/agent/assign", tl.requestAgentHandler) + http.HandleFunc("/agent/update", tl.updateAgentHandler) + http.HandleFunc("/agent/", tl.listAgentsHandler) + http.HandleFunc("/channel/", tl.handleChannel) + + http.ListenAndServe(listenOn, nil) + } diff --git a/resource/service_discovery/leader/leader_allocation.go b/resource/service_discovery/leader/leader_allocation.go index aa2646e..a21b95b 100644 --- a/resource/service_discovery/leader/leader_allocation.go +++ b/resource/service_discovery/leader/leader_allocation.go @@ -40,7 +40,7 @@ func (tl *TeamLeader) allocateServersOnRack(dc *resource.DataCenter, rack *resou } request := requests[j] - fmt.Printf("available %v, requested %v\n", available, request.ComputeResource) + // fmt.Printf("available %v, requested %v\n", available, request.ComputeResource) if available.Covers(request.ComputeResource) { allocated = append(allocated, resource.Allocation{ Location: agent.Location, @@ -117,7 +117,7 @@ func (tl *TeamLeader) findDataCenter(req *resource.AllocationRequest) (*resource } } if !found { - return nil, fmt.Errorf("Total compute resource is too big for any data center:%+v", totalComputeResource) + return nil, fmt.Errorf("Total compute resource is too big for any data center:%s", totalComputeResource) } // find a data center with unallocated resources diff --git a/util/http_handler.go b/util/http_handler.go new file mode 100644 index 0000000..8b99a58 --- /dev/null +++ b/util/http_handler.go @@ -0,0 +1,46 @@ +package util + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error) { + return Json(w, r, httpStatus, map[string]string{ + "error": obj, + }) +} + +func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { + var bytes []byte + if r.FormValue("pretty") != "" { + bytes, err = json.MarshalIndent(obj, "", " ") + } else { + bytes, err = json.Marshal(obj) + } + if err != nil { + return + } + callback := r.FormValue("callback") + if callback == "" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(httpStatus) + _, err = w.Write(bytes) + } else { + w.Header().Set("Content-Type", "application/javascript") + w.WriteHeader(httpStatus) + if _, err = w.Write([]uint8(callback)); err != nil { + return + } + if _, err = w.Write([]uint8("(")); err != nil { + return + } + fmt.Fprint(w, string(bytes)) + if _, err = w.Write([]uint8(")")); err != nil { + return + } + } + + return +}