Skip to content

Commit

Permalink
working resource allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Sep 30, 2015
1 parent ecdf5fa commit cef6f56
Show file tree
Hide file tree
Showing 44 changed files with 1,434 additions and 391 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
glow

examples/word_count/word_count

*.dat
38 changes: 27 additions & 11 deletions agent/agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"fmt"
"log"
"net"
"net/url"
"os"
"strconv"
"sync"

"github.com/chrislusf/glow/agent/store"
"github.com/chrislusf/glow/driver/cmd"
"github.com/chrislusf/glow/resource"
"github.com/chrislusf/glow/service_discovery/client"
"github.com/chrislusf/glow/resource/service_discovery/client"
"github.com/chrislusf/glow/util"
"github.com/golang/protobuf/proto"
)
Expand All @@ -34,7 +35,19 @@ func (ds *LiveDataStore) Destroy() {
ds.store.Destroy()
}

type AgentServerOption struct {
Leader *string
Port *int
Dir *string
DataCenter *string
Rack *string
MaxExecutor *int
MemoryMB *int64
CPULevel *int
}

type AgentServer struct {
Option *AgentServerOption
leader string
Port int
name2Store map[string]*LiveDataStore
Expand All @@ -45,18 +58,17 @@ type AgentServer struct {
computeResource resource.ComputeResource
}

func NewAgentServer(dir string, port int, leader string,
dataCenter, rack string,
maxExecutor int, cpuLevel int, memorySizeMB int) *AgentServer {
func NewAgentServer(option *AgentServerOption) *AgentServer {
as := &AgentServer{
leader: leader,
Port: port,
dir: dir,
Option: option,
leader: *option.Leader,
Port: *option.Port,
dir: *option.Dir,
name2Store: make(map[string]*LiveDataStore),
computeResource: resource.ComputeResource{
CPUCount: maxExecutor,
CPULevel: cpuLevel,
MemorySizeMB: memorySizeMB,
CPUCount: *option.MaxExecutor,
CPULevel: *option.CPULevel,
MemoryMB: *option.MemoryMB,
},
}

Expand Down Expand Up @@ -85,7 +97,11 @@ func (r *AgentServer) Init() (err error) {
func (as *AgentServer) Run() {
//register agent
killHeartBeaterChan := make(chan bool, 1)
go client.NewHeartBeater("a1", as.Port, as.leader).StartHeartBeat(killHeartBeaterChan)
go client.NewHeartBeater(as.Port, as.leader).StartAgentHeartBeat(killHeartBeaterChan, func(values url.Values) {
as.computeResource.AddToValues(values)
values.Add("dataCenter", *as.Option.DataCenter)
values.Add("rack", *as.Option.Rack)
})

for {
// Listen for an incoming connection.
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_server_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"

"github.com/chrislusf/glow/agent/store"
"github.com/chrislusf/glow/service_discovery/client"
"github.com/chrislusf/glow/resource/service_discovery/client"
"github.com/chrislusf/glow/util"
)

Expand All @@ -24,7 +24,7 @@ func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {
ds = as.name2Store[name]

//register stream
go client.NewHeartBeater(name, as.Port, "localhost:8930").StartHeartBeat(ds.killHeartBeater)
go client.NewHeartBeater(as.Port, "localhost:8930").StartChannelHeartBeat(ds.killHeartBeater, name)
}
as.name2StoreLock.Unlock()

Expand Down
26 changes: 19 additions & 7 deletions driver/context_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ import (

"github.com/chrislusf/glow/driver/rsync"
"github.com/chrislusf/glow/driver/scheduler"
"github.com/chrislusf/glow/flame"
"github.com/chrislusf/glow/flow"
)

type DriverOption struct {
ShouldStart bool
Leader string
DataCenter string
Rack string
}

func init() {
var driverOption DriverOption
flag.BoolVar(&driverOption.ShouldStart, "driver", false, "start in driver mode")
flag.StringVar(&driverOption.Leader, "driver.leader", "localhost:8930", "leader server")
flag.StringVar(&driverOption.DataCenter, "driver.dataCenter", "defaultDataCenter", "preferred data center name")
flag.StringVar(&driverOption.Rack, "driver.rack", "defaultRack", "preferred rack name")

flame.RegisterContextRunner(NewFlowContextDriver(&driverOption))
flow.RegisterContextRunner(NewFlowContextDriver(&driverOption))
}

type FlowContextDriver struct {
Expand All @@ -36,26 +40,34 @@ func (fcd *FlowContextDriver) IsDriverMode() bool {
}

// driver runs on local, controlling all tasks
func (fcd *FlowContextDriver) Run(fc *flame.FlowContext) {
func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

// rsyncServer :=
rsync.NewRsyncServer(os.Args[0])
// rsyncServer.Start()

taskGroups := scheduler.GroupTasks(fc)

sched := scheduler.NewScheduler(fcd.option.Leader)
go sched.Loop()
sched := scheduler.NewScheduler(
fcd.option.Leader,
&scheduler.SchedulerOption{
DataCenter: fcd.option.DataCenter,
Rack: fcd.option.Rack,
},
)
go sched.EventLoop()

// schedule to run the steps
var wg sync.WaitGroup
for _, taskGroup := range taskGroups {
wg.Add(1)
for i, taskGroup := range taskGroups {
sched.EventChan <- scheduler.SubmittedTaskGroup{
FlowContext: fc,
TaskGroup: taskGroup,
Bid: len(taskGroups) - i,
WaitGroup: &wg,
}
}
go sched.Market.FetcherLoop()

wg.Wait()
}
4 changes: 2 additions & 2 deletions driver/network_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"flag"
"sync"

"github.com/chrislusf/glow/receiver"
"github.com/chrislusf/glow/sender"
"github.com/chrislusf/glow/io/receiver"
"github.com/chrislusf/glow/io/sender"
)

var ()
Expand Down
28 changes: 23 additions & 5 deletions driver/scheduler/command_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/chrislusf/glow/driver/cmd"
"github.com/chrislusf/glow/service_discovery/client"
"github.com/chrislusf/glow/resource/service_discovery/client"
"github.com/chrislusf/glow/util"
"github.com/golang/protobuf/proto"
)
Expand All @@ -24,13 +24,28 @@ func NewStartRequest(path string, dir string, args ...string) *cmd.ControlMessag
}
}

func RemoteDirectExecute(server string, command *cmd.ControlMessage) error {
conn, err := getDirectCommandConnection(server)
if err != nil {
return err
}
defer conn.Close()

return doExecute(conn, command)
}

func RemoteExecute(leader string, agentName string, command *cmd.ControlMessage) error {
conn, err := getCommandConnection(leader, agentName)
if err != nil {
return err
}
defer conn.Close()

return doExecute(conn, command)
}

func doExecute(conn net.Conn, command *cmd.ControlMessage) error {

buf := make([]byte, 4)

// serialize the commend
Expand All @@ -41,7 +56,7 @@ func RemoteExecute(leader string, agentName string, command *cmd.ControlMessage)

// send the command
if err = util.WriteData(conn, buf, []byte("CMD "), data); err != nil {
println("failed to write to", agentName, ":", err.Error())
println("failed to write to", conn.RemoteAddr().String(), ":", err.Error())
return err
}

Expand All @@ -50,18 +65,17 @@ func RemoteExecute(leader string, agentName string, command *cmd.ControlMessage)
// read output and print it to stdout
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
fmt.Printf("%s>%s\n", agentName, scanner.Text())
fmt.Printf("%s>%s\n", "", scanner.Text())
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("Failed to scan output: %v", err)
}

return err

}

func getCommandConnection(leader string, agentName string) (net.Conn, error) {
l := client.NewNameServiceAgent(leader)
l := client.NewNameServiceProxy(leader)

// looking for the agentName
var target string
Expand All @@ -78,6 +92,10 @@ func getCommandConnection(leader string, agentName string) (net.Conn, error) {
}
}

return getDirectCommandConnection(target)
}

func getDirectCommandConnection(target string) (net.Conn, error) {
// connect to a TCP server
network := "tcp"
raddr, err := net.ResolveTCPAddr(network, target)
Expand Down
3 changes: 0 additions & 3 deletions driver/scheduler/events.go

This file was deleted.

24 changes: 12 additions & 12 deletions driver/scheduler/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package scheduler
import (
"log"

"github.com/chrislusf/glow/flame"
"github.com/chrislusf/glow/flow"
)

type TaskGroupStatus int
Expand All @@ -18,36 +18,36 @@ const (

type TaskGroup struct {
Id int
Tasks []*flame.Task
Tasks []*flow.Task
Parents []*TaskGroup
ParentStepGroup *StepGroup
Status TaskGroupStatus
}

type StepGroup struct {
Steps []*flame.Step
Steps []*flow.Step
Parents []*StepGroup
TaskGroups []*TaskGroup
}

func GroupTasks(fc *flame.FlowContext) []*TaskGroup {
func GroupTasks(fc *flow.FlowContext) []*TaskGroup {
stepGroups := translateToStepGroups(fc)
return translateToTaskGroups(stepGroups)
}

func findAncestorStepId(step *flame.Step) (int, bool) {
func findAncestorStepId(step *flow.Step) (int, bool) {
current := step
taskCount := len(step.Tasks)
var next *flame.Step
for current.Type == flame.Local && taskCount == len(current.Tasks) {
var next *flow.Step
for current.Type == flow.Local && taskCount == len(current.Tasks) {
if len(current.Inputs) > 1 {
log.Panic("local step should not have more than 1 input")
}
if len(current.Inputs) == 0 {
break
}
next = current.Inputs[0].Step
if next.Type != flame.Local || taskCount != len(next.Tasks) {
if next.Type != flow.Local || taskCount != len(next.Tasks) {
break
}
current = next
Expand All @@ -56,7 +56,7 @@ func findAncestorStepId(step *flame.Step) (int, bool) {
}

// group local steps into one step group
func translateToStepGroups(fc *flame.FlowContext) []*StepGroup {
func translateToStepGroups(fc *flow.FlowContext) []*StepGroup {
// use array instead of map to ensure consistent ordering
stepId2StepGroup := make([]*StepGroup, len(fc.Steps))
for _, step := range fc.Steps {
Expand Down Expand Up @@ -117,7 +117,7 @@ func translateToTaskGroups(stepId2StepGroup []*StepGroup) (ret []*TaskGroup) {
return
}

func assertSameNumberOfTasks(steps []*flame.Step) {
func assertSameNumberOfTasks(steps []*flow.Step) {
if len(steps) == 0 {
return
}
Expand All @@ -133,7 +133,7 @@ func NewStepGroup() *StepGroup {
return &StepGroup{}
}

func (t *StepGroup) AddStep(Step *flame.Step) *StepGroup {
func (t *StepGroup) AddStep(Step *flow.Step) *StepGroup {
t.Steps = append(t.Steps, Step)
return t
}
Expand All @@ -147,7 +147,7 @@ func NewTaskGroup() *TaskGroup {
return &TaskGroup{}
}

func (t *TaskGroup) AddTask(task *flame.Task) *TaskGroup {
func (t *TaskGroup) AddTask(task *flow.Task) *TaskGroup {
t.Tasks = append(t.Tasks, task)
return t
}
Expand Down
Loading

0 comments on commit cef6f56

Please sign in to comment.