diff --git a/driver/task_runner.go b/driver/task_runner.go index c3ecf57..b3c2527 100644 --- a/driver/task_runner.go +++ b/driver/task_runner.go @@ -6,20 +6,34 @@ import ( "reflect" "strings" "sync" + "time" "github.com/chrislusf/glow/driver/plan" "github.com/chrislusf/glow/flow" "github.com/chrislusf/glow/netchan" + "github.com/chrislusf/glow/util" ) type TaskRunner struct { - option *TaskOption - Tasks []*flow.Task - FlowContext *flow.FlowContext + option *TaskOption + Tasks []*flow.Task + FlowContext *flow.FlowContext + executorStatus *ExecutorStatus +} + +type ExecutorStatus struct { + InputChannelStatuses []*util.ChannelStatus + OutputChannelStatus *util.ChannelStatus + ReadyTime time.Time + StartTime time.Time + StopTime time.Time } func NewTaskRunner(option *TaskOption) *TaskRunner { - return &TaskRunner{option: option} + return &TaskRunner{ + option: option, + executorStatus: &ExecutorStatus{}, + } } func (tr *TaskRunner) IsTaskMode() bool { @@ -36,6 +50,8 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) { tr.Tasks = plan.GroupTasks(fc)[tr.option.TaskGroupId].Tasks tr.FlowContext = fc + tr.executorStatus.StartTime = time.Now() + // println("taskGroup", tr.Tasks[0].Name(), "starts") // 4. setup task input and output channels var wg sync.WaitGroup @@ -52,6 +68,7 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) { // 7. need to close connected output channels wg.Wait() // println("taskGroup", tr.Tasks[0].Name(), "finishes") + tr.executorStatus.StopTime = time.Now() } func (tr *TaskRunner) connectInputsAndOutputs(wg *sync.WaitGroup) { @@ -103,7 +120,8 @@ func (tr *TaskRunner) connectExternalInputs(wg *sync.WaitGroup, name2Location ma if err != nil { log.Panic(err) } - netchan.ConnectRawReadChannelToTyped(rawChan, firstTask.InputChans[i], d.Type, wg) + inChanStatus := netchan.ConnectRawReadChannelToTyped(rawChan, firstTask.InputChans[i], d.Type, wg) + tr.executorStatus.InputChannelStatuses = append(tr.executorStatus.InputChannelStatuses, inChanStatus) } } @@ -121,7 +139,8 @@ func (tr *TaskRunner) connectExternalInputChannels(wg *sync.WaitGroup) { log.Panic(err) } typedInputChan := make(chan reflect.Value) - netchan.ConnectRawReadChannelToTyped(rawChan, typedInputChan, ds.Type, wg) + inChanStatus := netchan.ConnectRawReadChannelToTyped(rawChan, typedInputChan, ds.Type, wg) + tr.executorStatus.InputChannelStatuses = append(tr.executorStatus.InputChannelStatuses, inChanStatus) firstTask.InputChans = append(firstTask.InputChans, typedInputChan) } } @@ -135,6 +154,6 @@ func (tr *TaskRunner) connectExternalOutputs(wg *sync.WaitGroup) { if err != nil { log.Panic(err) } - netchan.ConnectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg) + tr.executorStatus.OutputChannelStatus = netchan.ConnectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg) } } diff --git a/flow/step_task.go b/flow/step_task.go index ca9d1a4..dc9e43b 100644 --- a/flow/step_task.go +++ b/flow/step_task.go @@ -6,7 +6,7 @@ import ( "reflect" "strconv" - "github.com/chrislusf/glow/netchan" + "github.com/chrislusf/glow/util" ) type Task struct { @@ -74,5 +74,5 @@ func (t *Task) MergedInputChan() chan reflect.Value { for _, c := range t.InputChans { prevChans = append(prevChans, c) } - return netchan.MergeChannel(prevChans) + return util.MergeChannel(prevChans) } diff --git a/netchan/channels.go b/netchan/channels.go index 52f8f8d..e78ba70 100644 --- a/netchan/channels.go +++ b/netchan/channels.go @@ -13,24 +13,25 @@ import ( "github.com/chrislusf/glow/netchan/receiver" "github.com/chrislusf/glow/netchan/sender" + "github.com/chrislusf/glow/util" ) -type NetworkContext struct { +type NetworkOption struct { AgentPort int } -var networkContext NetworkContext +var networkOption NetworkOption func init() { - flag.IntVar(&networkContext.AgentPort, "glow.agent.port", 8931, "agent port") + flag.IntVar(&networkOption.AgentPort, "glow.agent.port", 8931, "agent port") } func GetLocalSendChannel(name string, wg *sync.WaitGroup) (chan []byte, error) { - return sender.NewSendChannel(name, networkContext.AgentPort, wg) + return sender.NewSendChannel(name, networkOption.AgentPort, wg) } func GetLocalReadChannel(name string, chanBufferSize int) (chan []byte, error) { - return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkContext.AgentPort), chanBufferSize) + return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkOption.AgentPort), chanBufferSize) } func GetDirectReadChannel(name, location string, chanBufferSize int) (chan []byte, error) { @@ -42,11 +43,12 @@ func GetDirectSendChannel(name string, target string, wg *sync.WaitGroup) (chan return sender.NewDirectSendChannel(name, target, wg) } -func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t reflect.Type, wg *sync.WaitGroup) chan reflect.Value { - +func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t reflect.Type, wg *sync.WaitGroup) (status *util.ChannelStatus) { + status = util.NewChannelStatus() wg.Add(1) go func() { defer wg.Done() + status.ReportStart() for data := range c { dec := gob.NewDecoder(bytes.NewBuffer(data)) @@ -55,20 +57,22 @@ func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t refle log.Fatal("data type:", v.Kind(), " decode error:", err) } else { out <- reflect.Indirect(v) + status.ReportAdd(1) } } close(out) + status.ReportClose() }() - - return out - + return status } -func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *sync.WaitGroup) { +func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *sync.WaitGroup) (status *util.ChannelStatus) { + status = util.NewChannelStatus() wg.Add(1) go func() { defer wg.Done() + status.ReportStart() var t reflect.Value for ok := true; ok; { @@ -79,39 +83,11 @@ func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *s log.Fatal("data type:", t.Type().String(), " ", t.Kind(), " encode error:", err) } c <- buf.Bytes() + status.ReportAdd(1) } } close(c) - - }() - -} - -func MergeChannel(cs []chan reflect.Value) (out chan reflect.Value) { - out = make(chan reflect.Value) - MergeChannelTo(cs, nil, out) - return out -} - -func MergeChannelTo(cs []chan reflect.Value, transformFn func(reflect.Value) reflect.Value, out chan reflect.Value) { - var wg sync.WaitGroup - - for _, c := range cs { - wg.Add(1) - go func(c chan reflect.Value) { - defer wg.Done() - for n := range c { - if transformFn != nil { - n = transformFn(n) - } - out <- n - } - }(c) - } - - go func() { - wg.Wait() - close(out) + status.ReportClose() }() - return + return status } diff --git a/util/channel_status.go b/util/channel_status.go new file mode 100644 index 0000000..f079533 --- /dev/null +++ b/util/channel_status.go @@ -0,0 +1,27 @@ +package util + +import ( + "time" +) + +type ChannelStatus struct { + Length int + StartTime time.Time + StopTime time.Time +} + +func NewChannelStatus() *ChannelStatus { + return &ChannelStatus{} +} + +func (s *ChannelStatus) ReportStart() { + s.StartTime = time.Now() +} + +func (s *ChannelStatus) ReportAdd(delta int) { + s.Length += delta +} + +func (s *ChannelStatus) ReportClose() { + s.StopTime = time.Now() +} diff --git a/util/channel_util.go b/util/channel_util.go new file mode 100644 index 0000000..c032876 --- /dev/null +++ b/util/channel_util.go @@ -0,0 +1,35 @@ +package util + +import ( + "reflect" + "sync" +) + +func MergeChannel(cs []chan reflect.Value) (out chan reflect.Value) { + out = make(chan reflect.Value) + MergeChannelTo(cs, nil, out) + return out +} + +func MergeChannelTo(cs []chan reflect.Value, transformFn func(reflect.Value) reflect.Value, out chan reflect.Value) { + var wg sync.WaitGroup + + for _, c := range cs { + wg.Add(1) + go func(c chan reflect.Value) { + defer wg.Done() + for n := range c { + if transformFn != nil { + n = transformFn(n) + } + out <- n + } + }(c) + } + + go func() { + wg.Wait() + close(out) + }() + return +}