Skip to content

Commit

Permalink
switch back to default http package
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Oct 10, 2015
1 parent 6100f15 commit 09f8655
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 53 deletions.
3 changes: 2 additions & 1 deletion driver/scheduler/scheduler_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (s *Scheduler) Fetch(demands []market.Demand) {
result, err := Assign(s.Leader, &request)
if err != nil {
log.Printf("%s Failed to allocate: %v", s.Leader, err)
time.Sleep(time.Millisecond * time.Duration(15000+rand.Int63n(5000)))
} else {
if len(result.Allocations) == 0 {
log.Printf("%s Failed to allocate any server.", s.Leader)
Expand Down Expand Up @@ -77,7 +78,7 @@ func Assign(leader string, request *resource.AllocationRequest) (*resource.Alloc
return nil, fmt.Errorf("/agent/assign result JSON unmarshal error:%v, json:%s", err, string(jsonBlob))
}
if ret.Error != "" {
return nil, fmt.Errorf("/agent/assign error:%v, json:%s", ret.Error, string(jsonBlob))
return nil, fmt.Errorf("/agent/assign error:%v", ret.Error)
}
return &ret, nil
}
5 changes: 5 additions & 0 deletions resource/resource.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package resource

import (
"fmt"
"net/http"
"net/url"
"strconv"
Expand All @@ -12,6 +13,10 @@ type ComputeResource struct {
MemoryMB int64 `json:"memoryMB,omitempty"`
}

func (a ComputeResource) String() string {
return fmt.Sprintf("CPUCount %d Level %d Memory %d MB", a.CPUCount, a.CPULevel, a.MemoryMB)
}

func (a ComputeResource) Minus(b ComputeResource) ComputeResource {
return ComputeResource{
CPUCount: a.CPUCount - b.CPUCount,
Expand Down
40 changes: 19 additions & 21 deletions resource/service_discovery/leader/handler_agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,51 @@ import (
"strings"

"github.com/chrislusf/glow/resource"
"github.com/labstack/echo"
"github.com/chrislusf/glow/util"
)

func (tl *TeamLeader) listAgentsHandler(c *echo.Context) error {
c.JSON(http.StatusAccepted, tl.LeaderResource.Topology)
return nil
func (tl *TeamLeader) listAgentsHandler(w http.ResponseWriter, r *http.Request) {
util.Json(w, r, http.StatusAccepted, tl.LeaderResource.Topology)
}

func (tl *TeamLeader) requestAgentHandler(c *echo.Context) error {
requestBlob := []byte(c.Request().FormValue("request"))
func (tl *TeamLeader) requestAgentHandler(w http.ResponseWriter, r *http.Request) {
requestBlob := []byte(r.FormValue("request"))
var request resource.AllocationRequest
err := json.Unmarshal(requestBlob, &request)
if err != nil {
return fmt.Errorf("request JSON unmarshal error:%v, json:%s", err, string(requestBlob))
util.Error(w, r, http.StatusBadRequest, fmt.Sprintf("request JSON unmarshal error:%v, json:%s", err, string(requestBlob)))
return
}

fmt.Printf("request:\n%+v\n", request)
// fmt.Printf("request:\n%+v\n", request)

result := tl.allocate(&request)
fmt.Printf("result: %v\n%+v\n", result.Error, result.Allocations)
// fmt.Printf("result: %v\n%+v\n", result.Error, result.Allocations)
if result.Error != "" {
c.JSON(http.StatusNotFound, result)
util.Json(w, r, http.StatusNotFound, result)
return
}

c.JSON(http.StatusAccepted, result)
util.Json(w, r, http.StatusAccepted, result)

return nil
}

func (tl *TeamLeader) updateAgentHandler(c *echo.Context) error {
servicePortString := c.Request().FormValue("servicePort")
func (tl *TeamLeader) updateAgentHandler(w http.ResponseWriter, r *http.Request) {
servicePortString := r.FormValue("servicePort")
servicePort, err := strconv.Atoi(servicePortString)
if err != nil {
log.Printf("Strange: servicePort not found: %s, %v", servicePortString, err)
}
host := c.Request().Host
host := r.Host
if strings.Contains(host, ":") {
host = host[0:strings.Index(host, ":")]
}
// println("received agent update from", host+":"+servicePort)
res, alloc := resource.NewComputeResourceFromRequest(c.Request())
res, alloc := resource.NewComputeResourceFromRequest(r)
ai := &resource.AgentInformation{
Location: resource.Location{
DataCenter: c.Request().FormValue("dataCenter"),
Rack: c.Request().FormValue("rack"),
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
Server: host,
Port: servicePort,
},
Expand All @@ -67,7 +67,5 @@ func (tl *TeamLeader) updateAgentHandler(c *echo.Context) error {

tl.LeaderResource.UpdateAgentInformation(ai)

c.NoContent(http.StatusAccepted)

return nil
w.WriteHeader(http.StatusAccepted)
}
35 changes: 19 additions & 16 deletions resource/service_discovery/leader/handler_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,30 @@ import (
"strings"
"time"

"github.com/labstack/echo"
"github.com/chrislusf/glow/util"
)

type ChannelInformation struct {
Location string
LastHeartBeat time.Time
}

func (tl *TeamLeader) listChannelsHandler(c *echo.Context) error {
path := c.P(0)
func (tl *TeamLeader) handleChannel(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
tl.updateChannelHandler(w, r)
} else {
tl.listChannelsHandler(w, r)
}
}

func (tl *TeamLeader) listChannelsHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path[len("/channel/"):]

freshChannels := make([]*ChannelInformation, 0)
rps, ok := tl.channels[path]
if !ok {
c.JSON(http.StatusOK, freshChannels)
return nil
util.Json(w, r, http.StatusOK, freshChannels)
return
}
for _, rp := range rps {
if rp.LastHeartBeat.Add(TimeOutLimit * time.Second).After(time.Now()) {
Expand All @@ -35,19 +43,18 @@ func (tl *TeamLeader) listChannelsHandler(c *echo.Context) error {
tl.channelsLock.Lock()
tl.channels[path] = freshChannels
tl.channelsLock.Unlock()
c.JSON(http.StatusOK, freshChannels)
return nil
util.Json(w, r, http.StatusOK, freshChannels)
}

// put agent information list under a path
func (tl *TeamLeader) updateChannelHandler(c *echo.Context) error {
servicePort := c.Request().FormValue("servicePort")
host := c.Request().Host
func (tl *TeamLeader) updateChannelHandler(w http.ResponseWriter, r *http.Request) {
servicePort := r.FormValue("servicePort")
host := r.Host
if strings.Contains(host, ":") {
host = host[0:strings.Index(host, ":")]
}
location := host + ":" + servicePort
path := c.P(0)
path := r.URL.Path[len("/channel/"):]
// println(path, ":", location)

rps, ok := tl.channels[path]
Expand All @@ -72,10 +79,6 @@ func (tl *TeamLeader) updateChannelHandler(c *echo.Context) error {
tl.channels[path] = rps
tl.channelsLock.Unlock()

c.JSON(http.StatusAccepted, tl.channels)
util.Json(w, r, http.StatusAccepted, tl.channels)

// id, _ := strconv.Atoi(c.Param("url"))
// utils.WriteJson(c, http.StatusOK, infos)
// c.String(http.StatusOK, c.P(0)+" runs at "+location.String()+".")
return nil
}
24 changes: 11 additions & 13 deletions resource/service_discovery/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http"
"sync"

"github.com/labstack/echo"
"github.com/chrislusf/glow/util"
)

type TeamLeader struct {
Expand All @@ -16,25 +16,23 @@ type TeamLeader struct {
LeaderResource *LeaderResource
}

func (tl *TeamLeader) statusHandler(c *echo.Context) error {
func (tl *TeamLeader) statusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Version"] = "0.001"
// utils.WriteJson(c, http.StatusOK, infos)
c.String(http.StatusOK, "Hello World")
return nil
util.Json(w, r, http.StatusOK, infos)
}

func RunLeader(listenOn string) {
tl := &TeamLeader{}
tl.channels = make(map[string][]*ChannelInformation)
tl.LeaderResource = NewLeaderResource()

e := echo.New()
e.Get("/", tl.statusHandler)
e.Post("/agent/assign", tl.requestAgentHandler)
e.Post("/agent/update", tl.updateAgentHandler)
e.Get("/agent/*", tl.listAgentsHandler)
e.Post("/channel/*", tl.updateChannelHandler)
e.Get("/channel/*", tl.listChannelsHandler)
e.Run(listenOn)
http.HandleFunc("/", tl.statusHandler)
http.HandleFunc("/agent/assign", tl.requestAgentHandler)
http.HandleFunc("/agent/update", tl.updateAgentHandler)
http.HandleFunc("/agent/", tl.listAgentsHandler)
http.HandleFunc("/channel/", tl.handleChannel)

http.ListenAndServe(listenOn, nil)

}
4 changes: 2 additions & 2 deletions resource/service_discovery/leader/leader_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (tl *TeamLeader) allocateServersOnRack(dc *resource.DataCenter, rack *resou
}
request := requests[j]

fmt.Printf("available %v, requested %v\n", available, request.ComputeResource)
// fmt.Printf("available %v, requested %v\n", available, request.ComputeResource)
if available.Covers(request.ComputeResource) {
allocated = append(allocated, resource.Allocation{
Location: agent.Location,
Expand Down Expand Up @@ -117,7 +117,7 @@ func (tl *TeamLeader) findDataCenter(req *resource.AllocationRequest) (*resource
}
}
if !found {
return nil, fmt.Errorf("Total compute resource is too big for any data center:%+v", totalComputeResource)
return nil, fmt.Errorf("Total compute resource is too big for any data center:%s", totalComputeResource)
}

// find a data center with unallocated resources
Expand Down
46 changes: 46 additions & 0 deletions util/http_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package util

import (
"encoding/json"
"fmt"
"net/http"
)

func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error) {
return Json(w, r, httpStatus, map[string]string{
"error": obj,
})
}

func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte
if r.FormValue("pretty") != "" {
bytes, err = json.MarshalIndent(obj, "", " ")
} else {
bytes, err = json.Marshal(obj)
}
if err != nil {
return
}
callback := r.FormValue("callback")
if callback == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpStatus)
_, err = w.Write(bytes)
} else {
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(httpStatus)
if _, err = w.Write([]uint8(callback)); err != nil {
return
}
if _, err = w.Write([]uint8("(")); err != nil {
return
}
fmt.Fprint(w, string(bytes))
if _, err = w.Write([]uint8(")")); err != nil {
return
}
}

return
}

0 comments on commit 09f8655

Please sign in to comment.