Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/chrislusf/glow
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Jan 16, 2016
2 parents a5ac99b + a8e0e83 commit 3790abd
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 146 deletions.
23 changes: 13 additions & 10 deletions driver/context_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func init() {

type FlowContextDriver struct {
option *DriverOption

stepGroups []*plan.StepGroup
taskGroups []*plan.TaskGroup
}

func NewFlowContextDriver(option *DriverOption) *FlowContextDriver {
Expand All @@ -59,18 +62,18 @@ func (fcd *FlowContextDriver) IsDriverPlotMode() bool {
}

func (fcd *FlowContextDriver) Plot(fc *flow.FlowContext) {
taskGroups := plan.GroupTasks(fc)
plan.PlotGraph(taskGroups, fc)
_, fcd.taskGroups = plan.GroupTasks(fc)
plan.PlotGraph(fcd.taskGroups, fc)
}

// driver runs on local, controlling all tasks
func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

// task fusion to minimize disk IO
taskGroups := plan.GroupTasks(fc)
fcd.stepGroups, fcd.taskGroups = plan.GroupTasks(fc)
// plot the execution graph
if fcd.option.PlotOutput {
plan.PlotGraph(taskGroups, fc)
plan.PlotGraph(fcd.taskGroups, fc)
return
}

Expand All @@ -97,22 +100,22 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

// best effort to clean data on agent disk
// this may need more improvements
defer fcd.Cleanup(sched, fc, taskGroups)
defer fcd.Cleanup(sched, fc)

go sched.EventLoop()

flow.OnInterrupt(func() {
fcd.OnInterrupt(fc, taskGroups, sched)
fcd.OnInterrupt(fc, sched)
})

// schedule to run the steps
var wg sync.WaitGroup
for _, taskGroup := range taskGroups {
for _, taskGroup := range fcd.taskGroups {
wg.Add(1)
sched.EventChan <- scheduler.SubmitTaskGroup{
FlowContext: fc,
TaskGroup: taskGroup,
Bid: fcd.option.FlowBid / float64(len(taskGroups)),
Bid: fcd.option.FlowBid / float64(len(fcd.taskGroups)),
WaitGroup: &wg,
}
}
Expand All @@ -124,12 +127,12 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

}

func (fcd *FlowContextDriver) Cleanup(sched *scheduler.Scheduler, fc *flow.FlowContext, taskGroups []*plan.TaskGroup) {
func (fcd *FlowContextDriver) Cleanup(sched *scheduler.Scheduler, fc *flow.FlowContext) {
var wg sync.WaitGroup
wg.Add(1)
sched.EventChan <- scheduler.ReleaseTaskGroupInputs{
FlowContext: fc,
TaskGroups: taskGroups,
TaskGroups: fcd.taskGroups,
WaitGroup: &wg,
}

Expand Down
93 changes: 92 additions & 1 deletion driver/context_driver_on_interrupt.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,104 @@
package driver

import (
"fmt"
"sync"
"time"

"github.com/chrislusf/glow/driver/plan"
"github.com/chrislusf/glow/driver/scheduler"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/resource"
"github.com/chrislusf/glow/util"
)

func (fcd *FlowContextDriver) OnInterrupt(
fc *flow.FlowContext,
taskGroups []*plan.TaskGroup,
sched *scheduler.Scheduler) {
status := fcd.collectStatusFromRemoteExecutors(sched)
fcd.printDistributedStatus(sched, status)
}

func (fcd *FlowContextDriver) printDistributedStatus(sched *scheduler.Scheduler, stats []*RemoteExecutorStatus) {
fmt.Print("\n")
for _, stepGroup := range fcd.stepGroups {
fmt.Print("step:")
for _, step := range stepGroup.Steps {
fmt.Printf(" %s%d", step.Name, step.Id)
}
fmt.Print("\n")

for _, tg := range stepGroup.TaskGroups {
stat := stats[tg.Id]
firstTask := tg.Tasks[0]
if stat.Closed() {
fmt.Printf(" %s taskId:%d time:%v completed %d\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken(), 0)
} else {
fmt.Printf(" %s taskId:%d time:%v processed %d\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken(), 0)
}
}

}
fmt.Print("\n")
}

type RemoteExecutorStatus struct {
ExecutorStatus
Allocation resource.Allocation
taskGroup *plan.TaskGroup
}

func (fcd *FlowContextDriver) collectStatusFromRemoteExecutors(sched *scheduler.Scheduler) []*RemoteExecutorStatus {
stats := make([]*RemoteExecutorStatus, len(fcd.taskGroups))
var wg sync.WaitGroup
for _, tg := range fcd.taskGroups {
wg.Add(1)
go func(tg *plan.TaskGroup) {
defer wg.Done()

requestId := tg.RequestId
request, ok := sched.RemoteExecutorStatuses[requestId]
if !ok {
fmt.Printf("No executors for %v\n", tg)
return
}
stat, err := askExecutorStatusForRequest(request.Allocation.Location.URL(), requestId)
if err != nil {
fmt.Printf("Error to request status from %s: %v\n", request.Allocation.Location.URL(), err)
return
}
stat.Allocation = request.Allocation
stat.taskGroup = tg
stats[tg.Id] = stat
}(tg)
}
wg.Wait()
return stats
}

func askExecutorStatusForRequest(server string, requestId int32) (*RemoteExecutorStatus, error) {

reply, err := scheduler.RemoteDirectCommand(server, scheduler.NewGetStatusRequest(requestId))
if err != nil {
return nil, err
}

response := reply.GetGetStatusResponse()

var inputStatuses []*util.ChannelStatus
for _, inputStatus := range response.GetInputStatuses() {

}

return &RemoteExecutorStatus{
ExecutorStatus: ExecutorStatus{
InputChannelStatuses: response.GetInputLength(),
OutputChannelStatus: &util.ChannelStatus{
Length: response.GetOutputLength(),
},
ReadyTime: time.Unix(response.GetReadyTime(), 0),
StartTime: time.Unix(response.GetStartTime(), 0),
StopTime: time.Unix(response.GetStopTime(), 0),
},
}, nil
}
26 changes: 26 additions & 0 deletions driver/executor_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package driver

import (
"time"

"github.com/chrislusf/glow/util"
)

type ExecutorStatus struct {
InputChannelStatuses []*util.ChannelStatus
OutputChannelStatus *util.ChannelStatus
ReadyTime time.Time
StartTime time.Time
StopTime time.Time
}

func (s *ExecutorStatus) Closed() bool {
return !s.StopTime.IsZero()
}

func (s *ExecutorStatus) TimeTaken() time.Duration {
if s.Closed() {
return s.StopTime.Sub(s.ReadyTime)
}
return time.Now().Sub(s.ReadyTime)
}
16 changes: 3 additions & 13 deletions driver/plan/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,12 @@ import (
"github.com/chrislusf/glow/flow"
)

type TaskGroupStatus int

const (
New TaskGroupStatus = iota
Waiting
Scheduled
Completed
Failed
)

type TaskGroup struct {
Id int
Tasks []*flow.Task
Parents []*TaskGroup
ParentStepGroup *StepGroup
Status TaskGroupStatus
RequestId int32 // id for actual request when running
}

type StepGroup struct {
Expand All @@ -31,9 +21,9 @@ type StepGroup struct {
TaskGroups []*TaskGroup
}

func GroupTasks(fc *flow.FlowContext) []*TaskGroup {
func GroupTasks(fc *flow.FlowContext) ([]*StepGroup, []*TaskGroup) {
stepGroups := translateToStepGroups(fc)
return translateToTaskGroups(stepGroups)
return stepGroups, translateToTaskGroups(stepGroups)
}

func isMergeableDataset(ds *flow.Dataset, taskCount int) bool {
Expand Down
9 changes: 9 additions & 0 deletions driver/scheduler/command_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func NewStartRequest(path string, dir string, args []string, allocated resource.
return request
}

func NewGetStatusRequest(requestId int32) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_GetStatusRequest.Enum(),
GetStatusRequest: &cmd.GetStatusRequest{
StartRequestHash: proto.Int32(requestId),
},
}
}

func NewDeleteDatasetShardRequest(name string) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_DeleteDatasetShardRequest.Enum(),
Expand Down
14 changes: 9 additions & 5 deletions driver/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"sync"
"time"

"github.com/chrislusf/glow/driver/cmd"
"github.com/chrislusf/glow/driver/scheduler/market"
"github.com/chrislusf/glow/resource"
)

type Scheduler struct {
Expand All @@ -20,6 +22,8 @@ type Scheduler struct {
}

type RemoteExecutorStatus struct {
Request *cmd.ControlMessage
Allocation resource.Allocation
RequestTime time.Time
InputLength int
OutputLength int
Expand Down Expand Up @@ -51,15 +55,15 @@ func NewScheduler(leader string, option *SchedulerOption) *Scheduler {
return s
}

func (s *Scheduler) getRemoteExecutorStatus(id int32) *RemoteExecutorStatus {
func (s *Scheduler) getRemoteExecutorStatus(id int32) (status *RemoteExecutorStatus, isOld bool) {
s.Lock()
defer s.Unlock()

status, ok := s.RemoteExecutorStatuses[id]
if ok {
return status
status, isOld = s.RemoteExecutorStatuses[id]
if isOld {
return status, isOld
}
status = &RemoteExecutorStatus{}
s.RemoteExecutorStatuses[id] = status
return status
return status, false
}
Loading

0 comments on commit 3790abd

Please sign in to comment.