Skip to content

Commit

Permalink
control+c now can stop distributed executions
Browse files Browse the repository at this point in the history
chrislusf committed Jan 16, 2016
1 parent 5728bf9 commit 5080fab
Showing 9 changed files with 66 additions and 5 deletions.
4 changes: 4 additions & 0 deletions agent/agent_server_cmd.go
Original file line number Diff line number Diff line change
@@ -27,5 +27,9 @@ func (as *AgentServer) handleCommandConnection(conn net.Conn,
reply.Type = cmd.ControlMessage_GetStatusResponse.Enum()
reply.GetStatusResponse = as.handleStatus(command.GetStatusRequest)
}
if command.GetType() == cmd.ControlMessage_StopRequest {
reply.Type = cmd.ControlMessage_StopResponse.Enum()
reply.StopResponse = as.handleStopRequest(command.StopRequest)
}
return reply
}
1 change: 1 addition & 0 deletions agent/agent_server_executor_start.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ func (as *AgentServer) handleStart(conn net.Conn,
} else {
reply.Pid = proto.Int32(int32(cmd.Process.Pid))
}
stat.Process = cmd.Process

cmd.Wait()
stat.StopTime = time.Now()
15 changes: 12 additions & 3 deletions agent/agent_server_executor_status.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package agent

import (
"fmt"

"github.com/chrislusf/glow/driver/cmd"
"github.com/golang/protobuf/proto"
)
@@ -18,7 +16,18 @@ func (as *AgentServer) handleStatus(getStatusRequest *cmd.GetStatusRequest) *cmd
StopTime: proto.Int64(stat.StopTime.Unix()),
}

fmt.Printf("stat: %v\n", stat)
return reply
}

func (as *AgentServer) handleStopRequest(stopRequest *cmd.StopRequest) *cmd.StopResponse {
requestId := stopRequest.GetStartRequestHash()
stat := as.localExecutorManager.getExecutorStatus(requestId)

stat.Process.Kill()

reply := &cmd.StopResponse{
StartRequestHash: proto.Int32(requestId),
}

return reply
}
2 changes: 2 additions & 0 deletions agent/local_executor_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"os"
"time"
)

@@ -11,6 +12,7 @@ type ExecutorStatus struct {
OutputLength int
StartTime time.Time
StopTime time.Time
Process *os.Process
LastAccessTime time.Time // used for expiring entries
}

2 changes: 2 additions & 0 deletions driver/context_driver.go
Original file line number Diff line number Diff line change
@@ -106,6 +106,8 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

flow.OnInterrupt(func() {
fcd.OnInterrupt(fc, sched)
}, func() {
fcd.OnExit(fc, sched)
})

// schedule to run the steps
31 changes: 31 additions & 0 deletions driver/context_driver_on_interrupt.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,32 @@ func (fcd *FlowContextDriver) OnInterrupt(
fcd.printDistributedStatus(sched, status)
}

func (fcd *FlowContextDriver) OnExit(
fc *flow.FlowContext,
sched *scheduler.Scheduler) {
var wg sync.WaitGroup
for _, tg := range fcd.taskGroups {
wg.Add(1)
go func(tg *plan.TaskGroup) {
defer wg.Done()

requestId := tg.RequestId
request, ok := sched.RemoteExecutorStatuses[requestId]
if !ok {
fmt.Printf("No executors for %v\n", tg)
return
}
// println("checking", request.Allocation.Location.URL(), requestId)
if err := askExecutorToStopRequest(request.Allocation.Location.URL(), requestId); err != nil {
fmt.Printf("Error to stop request %d on %s: %v\n", request.Allocation.Location.URL(), requestId, err)
return
}
}(tg)
}
wg.Wait()

}

func (fcd *FlowContextDriver) printDistributedStatus(sched *scheduler.Scheduler, stats []*RemoteExecutorStatus) {
fmt.Print("\n")
for _, stepGroup := range fcd.stepGroups {
@@ -112,3 +138,8 @@ func askExecutorStatusForRequest(server string, requestId int32) (*RemoteExecuto
},
}, nil
}

func askExecutorToStopRequest(server string, requestId int32) (err error) {
_, err = scheduler.RemoteDirectCommand(server, scheduler.NewStopRequest(requestId))
return
}
9 changes: 9 additions & 0 deletions driver/scheduler/command_execution.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,15 @@ func NewGetStatusRequest(requestId int32) *cmd.ControlMessage {
}
}

func NewStopRequest(requestId int32) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_StopRequest.Enum(),
StopRequest: &cmd.StopRequest{
StartRequestHash: proto.Int32(requestId),
},
}
}

func NewDeleteDatasetShardRequest(name string) *cmd.ControlMessage {
return &cmd.ControlMessage{
Type: cmd.ControlMessage_DeleteDatasetShardRequest.Enum(),
2 changes: 1 addition & 1 deletion flow/context_run.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ func (fc *FlowContext) runFlowContextInStandAloneMode() {

isDatasetStarted := make(map[int]bool)

OnInterrupt(fc.OnInterrupt)
OnInterrupt(fc.OnInterrupt, nil)

// start all task edges
for _, step := range fc.Steps {
5 changes: 4 additions & 1 deletion flow/signal_handling.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import (
"syscall"
)

func OnInterrupt(fn func()) {
func OnInterrupt(fn func(), onExitFunc func()) {
// deal with control+c,etc
signalChan := make(chan os.Signal, 1)
// controlling terminal close, daemon not exit
@@ -28,6 +28,9 @@ func OnInterrupt(fn func()) {
for sig := range signalChan {
fn()
if sig != syscall.SIGINFO {
if onExitFunc != nil {
onExitFunc()
}
os.Exit(0)
}
}

0 comments on commit 5080fab

Please sign in to comment.