Skip to content

Commit

Permalink
rename leader to master
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Oct 18, 2015
1 parent 17a7e8a commit bed7828
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 65 deletions.
8 changes: 4 additions & 4 deletions agent/agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (ds *LiveDataStore) Destroy() {
}

type AgentServerOption struct {
Leader *string
Master *string
Port *int
Dir *string
DataCenter *string
Expand All @@ -51,7 +51,7 @@ type AgentServerOption struct {

type AgentServer struct {
Option *AgentServerOption
leader string
Master string
Port int
name2Store map[string]*LiveDataStore
dir string
Expand All @@ -66,7 +66,7 @@ type AgentServer struct {
func NewAgentServer(option *AgentServerOption) *AgentServer {
as := &AgentServer{
Option: option,
leader: *option.Leader,
Master: *option.Master,
Port: *option.Port,
dir: *option.Dir,
name2Store: make(map[string]*LiveDataStore),
Expand Down Expand Up @@ -116,7 +116,7 @@ func (r *AgentServer) Init() (err error) {
func (as *AgentServer) Run() {
//register agent
killHeartBeaterChan := make(chan bool, 1)
go client.NewHeartBeater(as.Port, as.leader).StartAgentHeartBeat(killHeartBeaterChan, func(values url.Values) {
go client.NewHeartBeater(as.Port, as.Master).StartAgentHeartBeat(killHeartBeaterChan, func(values url.Values) {
resource.AddToValues(values, as.computeResource, as.allocatedResource)
values.Add("dataCenter", *as.Option.DataCenter)
values.Add("rack", *as.Option.Rack)
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_server_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {
as.name2StoreLock.Unlock()

//register stream
go client.NewHeartBeater(as.Port, *as.Option.Leader).StartChannelHeartBeat(ds.killHeartBeater, name)
go client.NewHeartBeater(as.Port, *as.Option.Master).StartChannelHeartBeat(ds.killHeartBeater, name)

// println(name, "start writing.")

Expand Down
20 changes: 10 additions & 10 deletions glow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ import (
a "github.com/chrislusf/glow/agent"
r "github.com/chrislusf/glow/io/receiver"
s "github.com/chrislusf/glow/io/sender"
l "github.com/chrislusf/glow/resource/service_discovery/leader"
m "github.com/chrislusf/glow/resource/service_discovery/master"
)

var (
app = kingpin.New("glow", "A command-line net channel.")

leader = app.Command("leader", "Start a leader process")
leaderPort = leader.Flag("port", "listening port").Default("8930").Int()
leaderIp = leader.Flag("ip", "listening IP adress").Default("localhost").String()
master = app.Command("master", "Start a master process")
masterPort = master.Flag("port", "listening port").Default("8930").Int()
masterIp = master.Flag("ip", "listening IP adress").Default("localhost").String()

agent = app.Command("agent", "Channel Agent")
agentOption = &a.AgentServerOption{
Dir: agent.Flag("dir", "agent folder to store computed data").Default(os.TempDir()).String(),
Port: agent.Flag("port", "agent listening port").Default("8931").Int(),
Leader: agent.Flag("leader", "leader address").Default("localhost:8930").String(),
Master: agent.Flag("master", "master address").Default("localhost:8930").String(),
DataCenter: agent.Flag("dataCenter", "data center name").Default("defaultDataCenter").String(),
Rack: agent.Flag("rack", "rack name").Default("defaultRack").String(),
MaxExecutor: agent.Flag("max.executors", "upper limit of executors").Default(strconv.Itoa(runtime.NumCPU())).Int(),
Expand All @@ -44,14 +44,14 @@ var (

receiver = app.Command("receive", "Receive data from a channel")
receiveFromChanName = receiver.Flag("from", "Name of a source channel").Required().String()
receiverLeader = receiver.Flag("leader", "ip:port format").Default("localhost:8930").String()
receiverMaster = receiver.Flag("master", "ip:port format").Default("localhost:8930").String()
)

func main() {
switch kingpin.MustParse(app.Parse(os.Args[1:])) {
case leader.FullCommand():
println("listening on", (*leaderIp)+":"+strconv.Itoa(*leaderPort))
l.RunLeader((*leaderIp) + ":" + strconv.Itoa(*leaderPort))
case master.FullCommand():
println("listening on", (*masterIp)+":"+strconv.Itoa(*masterPort))
m.RunMaster((*masterIp) + ":" + strconv.Itoa(*masterPort))
case sender.FullCommand():
var wg sync.WaitGroup
sendChan, err := s.NewSendChannel(*sendToChanName, *senderAgentPort, &wg)
Expand Down Expand Up @@ -82,7 +82,7 @@ func main() {

case receiver.FullCommand():
rc := r.NewReceiveChannel(*receiveFromChanName, 0)
recvChan, err := rc.GetDirectChannel(*receiverLeader)
recvChan, err := rc.GetDirectChannel(*receiverMaster)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions resource/service_discovery/client/name_service_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"log"

service "github.com/chrislusf/glow/resource/service_discovery/leader"
"github.com/chrislusf/glow/resource/service_discovery/master"
"github.com/chrislusf/glow/util"
)

Expand All @@ -26,7 +26,7 @@ func (n *NameServiceProxy) Find(name string) (locations []string) {
if err != nil {
log.Printf("Failed to list from %s:%v", l, err)
}
var ret []service.ChannelInformation
var ret []master.ChannelInformation
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
fmt.Printf("%s/list%s unmarshal error:%v, json:%s", l, name, err, string(jsonBlob))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Leader acts as a transient team leader
// It register each service's active locations.
package leader
package master

import (
"encoding/json"
Expand All @@ -14,11 +12,11 @@ import (
"github.com/chrislusf/glow/util"
)

func (tl *TeamLeader) listAgentsHandler(w http.ResponseWriter, r *http.Request) {
util.Json(w, r, http.StatusAccepted, tl.LeaderResource.Topology)
func (tl *TeamMaster) listAgentsHandler(w http.ResponseWriter, r *http.Request) {
util.Json(w, r, http.StatusAccepted, tl.MasterResource.Topology)
}

func (tl *TeamLeader) requestAgentHandler(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) requestAgentHandler(w http.ResponseWriter, r *http.Request) {
requestBlob := []byte(r.FormValue("request"))
var request resource.AllocationRequest
err := json.Unmarshal(requestBlob, &request)
Expand All @@ -40,7 +38,7 @@ func (tl *TeamLeader) requestAgentHandler(w http.ResponseWriter, r *http.Request

}

func (tl *TeamLeader) updateAgentHandler(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) updateAgentHandler(w http.ResponseWriter, r *http.Request) {
servicePortString := r.FormValue("servicePort")
servicePort, err := strconv.Atoi(servicePortString)
if err != nil {
Expand All @@ -65,7 +63,7 @@ func (tl *TeamLeader) updateAgentHandler(w http.ResponseWriter, r *http.Request)

// fmt.Printf("reported allocated: %v\n", alloc)

tl.LeaderResource.UpdateAgentInformation(ai)
tl.MasterResource.UpdateAgentInformation(ai)

w.WriteHeader(http.StatusAccepted)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Leader acts as a transient team leader
// It register each service's active locations.
package leader
package master

import (
"net/http"
Expand All @@ -15,15 +13,15 @@ type ChannelInformation struct {
LastHeartBeat time.Time
}

func (tl *TeamLeader) handleChannel(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) handleChannel(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
tl.updateChannelHandler(w, r)
} else {
tl.listChannelsHandler(w, r)
}
}

func (tl *TeamLeader) listChannelsHandler(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) listChannelsHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path[len("/channel/"):]

freshChannels := make([]*ChannelInformation, 0)
Expand All @@ -47,7 +45,7 @@ func (tl *TeamLeader) listChannelsHandler(w http.ResponseWriter, r *http.Request
}

// put agent information list under a path
func (tl *TeamLeader) updateChannelHandler(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) updateChannelHandler(w http.ResponseWriter, r *http.Request) {
servicePort := r.FormValue("servicePort")
host := r.Host
if strings.Contains(host, ":") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Leader acts as a transient team leader
// It register each service's active locations.
package leader
package master

import (
"net/http"
Expand All @@ -9,23 +7,23 @@ import (
"github.com/chrislusf/glow/util"
)

type TeamLeader struct {
type TeamMaster struct {
channels map[string][]*ChannelInformation
channelsLock sync.Mutex

LeaderResource *LeaderResource
MasterResource *MasterResource
}

func (tl *TeamLeader) statusHandler(w http.ResponseWriter, r *http.Request) {
func (tl *TeamMaster) statusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Version"] = "0.001"
util.Json(w, r, http.StatusOK, infos)
}

func RunLeader(listenOn string) {
tl := &TeamLeader{}
func RunMaster(listenOn string) {
tl := &TeamMaster{}
tl.channels = make(map[string][]*ChannelInformation)
tl.LeaderResource = NewLeaderResource()
tl.MasterResource = NewMasterResource()

http.HandleFunc("/", tl.statusHandler)
http.HandleFunc("/agent/assign", tl.requestAgentHandler)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package leader
package master

import (
"fmt"
Expand All @@ -8,7 +8,7 @@ import (
"github.com/chrislusf/glow/resource"
)

func (tl *TeamLeader) allocate(req *resource.AllocationRequest) (result *resource.AllocationResult) {
func (tl *TeamMaster) allocate(req *resource.AllocationRequest) (result *resource.AllocationResult) {
result = &resource.AllocationResult{}
dc, err := tl.findDataCenter(req)
if err != nil {
Expand All @@ -22,7 +22,7 @@ func (tl *TeamLeader) allocate(req *resource.AllocationRequest) (result *resourc
return
}

func (tl *TeamLeader) allocateServersOnRack(dc *resource.DataCenter, rack *resource.Rack, requests []*resource.ComputeRequest) (
func (tl *TeamMaster) allocateServersOnRack(dc *resource.DataCenter, rack *resource.Rack, requests []*resource.ComputeRequest) (
allocated []resource.Allocation, remainingRequests []*resource.ComputeRequest) {
var j = -1
for _, agent := range rack.Agents {
Expand All @@ -49,7 +49,7 @@ func (tl *TeamLeader) allocateServersOnRack(dc *resource.DataCenter, rack *resou
agent.Allocated = agent.Allocated.Plus(request.ComputeResource)
rack.Allocated = rack.Allocated.Plus(request.ComputeResource)
dc.Allocated = dc.Allocated.Plus(request.ComputeResource)
tl.LeaderResource.Topology.Allocated = tl.LeaderResource.Topology.Allocated.Plus(request.ComputeResource)
tl.MasterResource.Topology.Allocated = tl.MasterResource.Topology.Allocated.Plus(request.ComputeResource)
available = available.Minus(request.ComputeResource)
hasAllocation = true
} else {
Expand All @@ -60,7 +60,7 @@ func (tl *TeamLeader) allocateServersOnRack(dc *resource.DataCenter, rack *resou
return
}

func (tl *TeamLeader) findServers(dc *resource.DataCenter, req *resource.AllocationRequest) (ret []resource.Allocation) {
func (tl *TeamMaster) findServers(dc *resource.DataCenter, req *resource.AllocationRequest) (ret []resource.Allocation) {
// sort racks by unallocated resources
racks := make([]*resource.Rack, 0, len(dc.Racks))
for _, rack := range dc.Racks {
Expand All @@ -84,7 +84,7 @@ func (tl *TeamLeader) findServers(dc *resource.DataCenter, req *resource.Allocat
return
}

func (tl *TeamLeader) findDataCenter(req *resource.AllocationRequest) (*resource.DataCenter, error) {
func (tl *TeamMaster) findDataCenter(req *resource.AllocationRequest) (*resource.DataCenter, error) {
// calculate total resource requested
var totalComputeResource resource.ComputeResource
for _, cr := range req.Requests {
Expand All @@ -100,21 +100,21 @@ func (tl *TeamLeader) findDataCenter(req *resource.AllocationRequest) (*resource
}
}
if dcName != "" {
dc, hasDc := tl.LeaderResource.Topology.DataCenters[dcName]
dc, hasDc := tl.MasterResource.Topology.DataCenters[dcName]
if !hasDc {
return nil, fmt.Errorf("Failed to find existing data center: %s", dcName)
}
return dc, nil
}

if len(tl.LeaderResource.Topology.DataCenters) == 0 {
if len(tl.MasterResource.Topology.DataCenters) == 0 {
return nil, fmt.Errorf("No data centers found.")
}

// weighted reservior sampling
var selectedDc *resource.DataCenter
var seenWeight int64
for _, dc := range tl.LeaderResource.Topology.DataCenters {
for _, dc := range tl.MasterResource.Topology.DataCenters {
available := dc.Resource.Minus(dc.Allocated)
weight := available.MemoryMB
if weight > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package leader
package master

import (
"fmt"
Expand All @@ -9,7 +9,7 @@ import (

func TestAllocation1(t *testing.T) {

lr := NewLeaderResource()
lr := NewMasterResource()
lr.UpdateAgentInformation(&resource.AgentInformation{
Location: resource.Location{
DataCenter: "dc1",
Expand Down Expand Up @@ -71,9 +71,9 @@ func TestAllocation1(t *testing.T) {
},
}

tl := &TeamLeader{}
tl := &TeamMaster{}
tl.channels = make(map[string][]*ChannelInformation)
tl.LeaderResource = lr
tl.MasterResource = lr

result := tl.allocate(req)
t.Logf("Result: %+v", result)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestAllocation1(t *testing.T) {

func TestAllocation2(t *testing.T) {

lr := NewLeaderResource()
lr := NewMasterResource()
lr.UpdateAgentInformation(&resource.AgentInformation{
Location: resource.Location{
DataCenter: "dc1",
Expand All @@ -137,9 +137,9 @@ func TestAllocation2(t *testing.T) {
},
})

tl := &TeamLeader{}
tl := &TeamMaster{}
tl.channels = make(map[string][]*ChannelInformation)
tl.LeaderResource = lr
tl.MasterResource = lr

req := &resource.AllocationRequest{
Requests: []resource.ComputeRequest{
Expand Down
Loading

0 comments on commit bed7828

Please sign in to comment.