Skip to content

Commit

Permalink
WIP to improve distributed process status. Check in to fix compilatio…
Browse files Browse the repository at this point in the history
…n error first
  • Loading branch information
chrislusf committed Jan 18, 2016
1 parent 928ac52 commit c8dea04
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 106 deletions.
5 changes: 4 additions & 1 deletion agent/agent_server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ func (as *AgentServer) handleCommandConnection(conn net.Conn,
reply.DeleteDatasetShardResponse = as.handleDeleteDatasetShard(conn, command.DeleteDatasetShardRequest)
case cmd.ControlMessage_GetStatusRequest:
reply.Type = cmd.ControlMessage_GetStatusResponse.Enum()
reply.GetStatusResponse = as.handleStatus(command.GetStatusRequest)
reply.GetStatusResponse = as.handleGetStatusRequest(command.GetStatusRequest)
case cmd.ControlMessage_StopRequest:
reply.Type = cmd.ControlMessage_StopResponse.Enum()
reply.StopResponse = as.handleStopRequest(command.StopRequest)
case cmd.ControlMessage_LocalStatusReportRequest:
reply.Type = cmd.ControlMessage_LocalStatusReportResponse.Enum()
reply.LocalStatusReportResponse = as.handleLocalStatusReportRequest(command.LocalStatusReportRequest)
}
return reply
}
9 changes: 8 additions & 1 deletion agent/agent_server_executor_start.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"fmt"
"log"
"net"
"os"
Expand Down Expand Up @@ -41,7 +42,7 @@ func (as *AgentServer) handleStart(conn net.Conn,
stat.StartTime = time.Now()
cmd := exec.Command(
startRequest.GetPath(),
startRequest.Args...,
adjustArgs(startRequest.Args, startRequest.GetHashCode())...,
)
cmd.Env = startRequest.Envs
cmd.Dir = dir
Expand All @@ -65,6 +66,12 @@ func (as *AgentServer) handleStart(conn net.Conn,
return reply
}

func adjustArgs(args []string, requestId uint32) (ret []string) {
ret = append(args, "-glow.request.id")
ret = append(ret, fmt.Sprintf("%d", requestId))
return
}

func (as *AgentServer) plusAllocated(allocated resource.ComputeResource) {
as.allocatedResourceLock.Lock()
defer as.allocatedResourceLock.Unlock()
Expand Down
24 changes: 21 additions & 3 deletions agent/agent_server_executor_status.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package agent

import (
"time"

"github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/driver/cmd"
"github.com/golang/protobuf/proto"
)

func (as *AgentServer) handleStatus(getStatusRequest *cmd.GetStatusRequest) *cmd.GetStatusResponse {
func (as *AgentServer) handleGetStatusRequest(getStatusRequest *cmd.GetStatusRequest) *cmd.GetStatusResponse {
requestId := getStatusRequest.GetStartRequestHash()
stat := as.localExecutorManager.getExecutorStatus(requestId)

reply := &cmd.GetStatusResponse{
StartRequestHash: proto.Int32(requestId),
StartRequestHash: proto.Uint32(requestId),
InputStatuses: driver.ToProto(stat.InputChannelStatuses),
OutputStatuses: driver.ToProto(stat.OutputChannelStatuses),
RequestTime: proto.Int64(stat.RequestTime.Unix()),
StartTime: proto.Int64(stat.StartTime.Unix()),
StopTime: proto.Int64(stat.StopTime.Unix()),
Expand All @@ -19,6 +24,19 @@ func (as *AgentServer) handleStatus(getStatusRequest *cmd.GetStatusRequest) *cmd
return reply
}

func (as *AgentServer) handleLocalStatusReportRequest(localStatusRequest *cmd.LocalStatusReportRequest) *cmd.LocalStatusReportResponse {
requestId := localStatusRequest.GetStartRequestHash()
stat := as.localExecutorManager.getExecutorStatus(requestId)

stat.InputChannelStatuses = driver.FromProto(localStatusRequest.GetInputStatuses())
stat.OutputChannelStatuses = driver.FromProto(localStatusRequest.GetOutputStatuses())
stat.LastAccessTime = time.Now()

reply := &cmd.LocalStatusReportResponse{}

return reply
}

func (as *AgentServer) handleStopRequest(stopRequest *cmd.StopRequest) *cmd.StopResponse {
requestId := stopRequest.GetStartRequestHash()
stat := as.localExecutorManager.getExecutorStatus(requestId)
Expand All @@ -29,7 +47,7 @@ func (as *AgentServer) handleStopRequest(stopRequest *cmd.StopRequest) *cmd.Stop
}

reply := &cmd.StopResponse{
StartRequestHash: proto.Int32(requestId),
StartRequestHash: proto.Uint32(requestId),
}

return reply
Expand Down
8 changes: 4 additions & 4 deletions agent/local_executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import (

type LocalExecutorManager struct {
sync.Mutex
id2ExecutorStatus map[int32]*ExecutorStatus
id2ExecutorStatus map[uint32]*AgentExecutorStatus
}

func newLocalExecutorsManager() *LocalExecutorManager {
m := &LocalExecutorManager{
id2ExecutorStatus: make(map[int32]*ExecutorStatus),
id2ExecutorStatus: make(map[uint32]*AgentExecutorStatus),
}
go m.purgeExpiredEntries()
return m
}

func (m *LocalExecutorManager) getExecutorStatus(id int32) *ExecutorStatus {
func (m *LocalExecutorManager) getExecutorStatus(id uint32) *AgentExecutorStatus {
m.Lock()
defer m.Unlock()
executorStatus, ok := m.id2ExecutorStatus[id]
if ok {
return executorStatus
}

executorStatus = &ExecutorStatus{LastAccessTime: time.Now()}
executorStatus = &AgentExecutorStatus{LastAccessTime: time.Now()}
m.id2ExecutorStatus[id] = executorStatus

return executorStatus
Expand Down
14 changes: 4 additions & 10 deletions agent/local_executor_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ package agent
import (
"os"
"time"

"github.com/chrislusf/glow/util"
)

type ExecutorStatus struct {
type AgentExecutorStatus struct {
util.ExecutorStatus
RequestHash int32
RequestTime time.Time
InputLength int
OutputLength int
StartTime time.Time
StopTime time.Time
Process *os.Process
LastAccessTime time.Time // used for expiring entries
}

func (es *ExecutorStatus) IsClosed() bool {
return !es.StopTime.IsZero()
}
50 changes: 21 additions & 29 deletions driver/context_driver_on_interrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/chrislusf/glow/driver/plan"
"github.com/chrislusf/glow/driver/scheduler"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/resource"
"github.com/chrislusf/glow/util"
)

Expand Down Expand Up @@ -57,27 +56,31 @@ func (fcd *FlowContextDriver) printDistributedStatus(sched *scheduler.Scheduler,
for _, tg := range stepGroup.TaskGroups {
stat := stats[tg.Id]
firstTask := tg.Tasks[0]
// lastTask := tg.Tasks[len(tg.Tasks)-1]
step := firstTask.Step
if stat == nil {
fmt.Printf(" No status.\n")
continue
}
if stat.Closed() {
fmt.Printf(" %s taskId:%d time:%v completed %d\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken(), 0)
if stat.IsClosed() {
fmt.Printf(" %s taskId:%d time:%v\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken())
} else {
fmt.Printf(" %s taskId:%d time:%v processed %d\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken(), 0)
fmt.Printf(" %s taskId:%d time:%v\n", stat.Allocation.Location.URL(), firstTask.Id, stat.TimeTaken())
}
if !stat.IsClosed() {
for _, inputStat := range stat.InputChannelStatuses {
fmt.Printf(" input : d%d_%d %d\n", step.Id, firstTask.Id, inputStat.Length)
}
}
for _, outputStat := range stat.OutputChannelStatuses {
fmt.Printf(" output : d%d_%d %d\n", step.Id, firstTask.Id, outputStat.Length)
}
}

}
fmt.Print("\n")
}

type RemoteExecutorStatus struct {
ExecutorStatus
Allocation resource.Allocation
taskGroup *plan.TaskGroup
}

func (fcd *FlowContextDriver) collectStatusFromRemoteExecutors(sched *scheduler.Scheduler) []*RemoteExecutorStatus {
stats := make([]*RemoteExecutorStatus, len(fcd.taskGroups))
var wg sync.WaitGroup
Expand Down Expand Up @@ -108,7 +111,7 @@ func (fcd *FlowContextDriver) collectStatusFromRemoteExecutors(sched *scheduler.
return stats
}

func askExecutorStatusForRequest(server string, requestId int32) (*RemoteExecutorStatus, error) {
func askExecutorStatusForRequest(server string, requestId uint32) (*RemoteExecutorStatus, error) {

reply, err := scheduler.RemoteDirectCommand(server, scheduler.NewGetStatusRequest(requestId))
if err != nil {
Expand All @@ -117,29 +120,18 @@ func askExecutorStatusForRequest(server string, requestId int32) (*RemoteExecuto

response := reply.GetGetStatusResponse()

var inputStatuses []*util.ChannelStatus
for _, inputStatus := range response.GetInputStatuses() {
inputStatuses = append(inputStatuses, &util.ChannelStatus{
Length: inputStatus.GetLength(),
StartTime: time.Unix(inputStatus.GetStartTime(), 0),
StopTime: time.Unix(inputStatus.GetStopTime(), 0),
})
}

return &RemoteExecutorStatus{
ExecutorStatus: ExecutorStatus{
InputChannelStatuses: inputStatuses,
OutputChannelStatus: &util.ChannelStatus{
Length: response.GetOutputStatus().GetLength(),
},
RequestTime: time.Unix(response.GetRequestTime(), 0),
StartTime: time.Unix(response.GetStartTime(), 0),
StopTime: time.Unix(response.GetStopTime(), 0),
ExecutorStatus: util.ExecutorStatus{
InputChannelStatuses: FromProto(response.GetInputStatuses()),
OutputChannelStatuses: FromProto(response.GetOutputStatuses()),
RequestTime: time.Unix(response.GetRequestTime(), 0),
StartTime: time.Unix(response.GetStartTime(), 0),
StopTime: time.Unix(response.GetStopTime(), 0),
},
}, nil
}

func askExecutorToStopRequest(server string, requestId int32) (err error) {
func askExecutorToStopRequest(server string, requestId uint32) (err error) {
_, err = scheduler.RemoteDirectCommand(server, scheduler.NewStopRequest(requestId))
return
}
37 changes: 25 additions & 12 deletions driver/executor_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@ package driver
import (
"time"

"github.com/chrislusf/glow/driver/cmd"
"github.com/chrislusf/glow/driver/plan"
"github.com/chrislusf/glow/resource"
"github.com/chrislusf/glow/util"
"github.com/golang/protobuf/proto"
)

type ExecutorStatus struct {
InputChannelStatuses []*util.ChannelStatus
OutputChannelStatus *util.ChannelStatus
RequestTime time.Time
StartTime time.Time
StopTime time.Time
type RemoteExecutorStatus struct {
util.ExecutorStatus
Allocation resource.Allocation
taskGroup *plan.TaskGroup
}

func (s *ExecutorStatus) Closed() bool {
return !s.StopTime.IsZero()
func ToProto(channelStatuses []*util.ChannelStatus) (ret []*cmd.ChannelStatus) {
for _, stat := range channelStatuses {
ret = append(ret, &cmd.ChannelStatus{
Length: proto.Int64(stat.Length),
StartTime: proto.Int64(stat.StartTime.Unix()),
StopTime: proto.Int64(stat.StopTime.Unix()),
})
}
return
}

func (s *ExecutorStatus) TimeTaken() time.Duration {
if s.Closed() {
return s.StopTime.Sub(s.RequestTime)
func FromProto(channelStatuses []*cmd.ChannelStatus) (ret []*util.ChannelStatus) {
for _, stat := range channelStatuses {
ret = append(ret, &util.ChannelStatus{
Length: stat.GetLength(),
StartTime: time.Unix(stat.GetStartTime(), 0),
StopTime: time.Unix(stat.GetStopTime(), 0),
})
}
return time.Now().Sub(s.RequestTime)
return
}
2 changes: 1 addition & 1 deletion driver/plan/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type TaskGroup struct {
Tasks []*flow.Task
Parents []*TaskGroup
ParentStepGroup *StepGroup
RequestId int32 // id for actual request when running
RequestId uint32 // id for actual request when running
}

type StepGroup struct {
Expand Down
12 changes: 6 additions & 6 deletions driver/scheduler/command_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewStartRequest(path string, dir string, args []string, allocated resource.
},
Envs: envs,
Port: proto.Int32(port),
HashCode: proto.Int32(0),
HashCode: proto.Uint32(0),
},
}

Expand All @@ -39,25 +39,25 @@ func NewStartRequest(path string, dir string, args []string, allocated resource.
log.Fatalf("marshaling start request error: %v", err)
return nil
}
request.StartRequest.HashCode = proto.Int32(int32(util.Hash(data)))
request.StartRequest.HashCode = proto.Uint32(uint32(util.Hash(data)))

return request
}

func NewGetStatusRequest(requestId int32) *cmd.ControlMessage {
func NewGetStatusRequest(requestId uint32) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_GetStatusRequest.Enum(),
GetStatusRequest: &cmd.GetStatusRequest{
StartRequestHash: proto.Int32(requestId),
StartRequestHash: proto.Uint32(requestId),
},
}
}

func NewStopRequest(requestId int32) *cmd.ControlMessage {
func NewStopRequest(requestId uint32) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_StopRequest.Enum(),
StopRequest: &cmd.StopRequest{
StartRequestHash: proto.Int32(requestId),
StartRequestHash: proto.Uint32(requestId),
},
}
}
Expand Down
6 changes: 3 additions & 3 deletions driver/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Scheduler struct {
Market *market.Market
option *SchedulerOption
shardLocator *DatasetShardLocator
RemoteExecutorStatuses map[int32]*RemoteExecutorStatus
RemoteExecutorStatuses map[uint32]*RemoteExecutorStatus
}

type RemoteExecutorStatus struct {
Expand Down Expand Up @@ -49,13 +49,13 @@ func NewScheduler(leader string, option *SchedulerOption) *Scheduler {
Market: market.NewMarket(),
shardLocator: NewDatasetShardLocator(option.ExecutableFileHash),
option: option,
RemoteExecutorStatuses: make(map[int32]*RemoteExecutorStatus),
RemoteExecutorStatuses: make(map[uint32]*RemoteExecutorStatus),
}
s.Market.SetScoreFunction(s.Score).SetFetchFunction(s.Fetch)
return s
}

func (s *Scheduler) getRemoteExecutorStatus(id int32) (status *RemoteExecutorStatus, isOld bool) {
func (s *Scheduler) getRemoteExecutorStatus(id uint32) (status *RemoteExecutorStatus, isOld bool) {
s.Lock()
defer s.Unlock()

Expand Down
4 changes: 3 additions & 1 deletion driver/task_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
type TaskOption struct {
ContextId int
TaskGroupId int
FistTaskName string
FistTaskName string // only for debugging purpose, show it in command line
Inputs string
ExecutableFileHash string
ChannelBufferSize int
RequestId uint64
}

var taskOption TaskOption
Expand All @@ -24,6 +25,7 @@ func init() {
flag.StringVar(&taskOption.Inputs, "glow.taskGroup.inputs", "", "comma and @ seperated input locations")
flag.StringVar(&taskOption.ExecutableFileHash, "glow.exe.hash", "", "hash of executable binary file")
flag.IntVar(&taskOption.ChannelBufferSize, "glow.channel.bufferSize", 0, "channel buffer size for reading inputs")
flag.Uint64Var(&taskOption.RequestId, "glow.request.id", 0, "request id received from agent")

flow.RegisterTaskRunner(NewTaskRunner(&taskOption))
}
Loading

0 comments on commit c8dea04

Please sign in to comment.