Skip to content

Commit

Permalink
add net channel status, and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Jan 11, 2016
1 parent 7351b8a commit f07aaaa
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 51 deletions.
33 changes: 26 additions & 7 deletions driver/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,34 @@ import (
"reflect"
"strings"
"sync"
"time"

"github.com/chrislusf/glow/driver/plan"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/netchan"
"github.com/chrislusf/glow/util"
)

type TaskRunner struct {
option *TaskOption
Tasks []*flow.Task
FlowContext *flow.FlowContext
option *TaskOption
Tasks []*flow.Task
FlowContext *flow.FlowContext
executorStatus *ExecutorStatus
}

type ExecutorStatus struct {
InputChannelStatuses []*util.ChannelStatus
OutputChannelStatus *util.ChannelStatus
ReadyTime time.Time
StartTime time.Time
StopTime time.Time
}

func NewTaskRunner(option *TaskOption) *TaskRunner {
return &TaskRunner{option: option}
return &TaskRunner{
option: option,
executorStatus: &ExecutorStatus{},
}
}

func (tr *TaskRunner) IsTaskMode() bool {
Expand All @@ -36,6 +50,8 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) {
tr.Tasks = plan.GroupTasks(fc)[tr.option.TaskGroupId].Tasks
tr.FlowContext = fc

tr.executorStatus.StartTime = time.Now()

// println("taskGroup", tr.Tasks[0].Name(), "starts")
// 4. setup task input and output channels
var wg sync.WaitGroup
Expand All @@ -52,6 +68,7 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) {
// 7. need to close connected output channels
wg.Wait()
// println("taskGroup", tr.Tasks[0].Name(), "finishes")
tr.executorStatus.StopTime = time.Now()
}

func (tr *TaskRunner) connectInputsAndOutputs(wg *sync.WaitGroup) {
Expand Down Expand Up @@ -103,7 +120,8 @@ func (tr *TaskRunner) connectExternalInputs(wg *sync.WaitGroup, name2Location ma
if err != nil {
log.Panic(err)
}
netchan.ConnectRawReadChannelToTyped(rawChan, firstTask.InputChans[i], d.Type, wg)
inChanStatus := netchan.ConnectRawReadChannelToTyped(rawChan, firstTask.InputChans[i], d.Type, wg)
tr.executorStatus.InputChannelStatuses = append(tr.executorStatus.InputChannelStatuses, inChanStatus)
}
}

Expand All @@ -121,7 +139,8 @@ func (tr *TaskRunner) connectExternalInputChannels(wg *sync.WaitGroup) {
log.Panic(err)
}
typedInputChan := make(chan reflect.Value)
netchan.ConnectRawReadChannelToTyped(rawChan, typedInputChan, ds.Type, wg)
inChanStatus := netchan.ConnectRawReadChannelToTyped(rawChan, typedInputChan, ds.Type, wg)
tr.executorStatus.InputChannelStatuses = append(tr.executorStatus.InputChannelStatuses, inChanStatus)
firstTask.InputChans = append(firstTask.InputChans, typedInputChan)
}
}
Expand All @@ -135,6 +154,6 @@ func (tr *TaskRunner) connectExternalOutputs(wg *sync.WaitGroup) {
if err != nil {
log.Panic(err)
}
netchan.ConnectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg)
tr.executorStatus.OutputChannelStatus = netchan.ConnectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg)
}
}
4 changes: 2 additions & 2 deletions flow/step_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"reflect"
"strconv"

"github.com/chrislusf/glow/netchan"
"github.com/chrislusf/glow/util"
)

type Task struct {
Expand Down Expand Up @@ -74,5 +74,5 @@ func (t *Task) MergedInputChan() chan reflect.Value {
for _, c := range t.InputChans {
prevChans = append(prevChans, c)
}
return netchan.MergeChannel(prevChans)
return util.MergeChannel(prevChans)
}
60 changes: 18 additions & 42 deletions netchan/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,25 @@ import (

"github.com/chrislusf/glow/netchan/receiver"
"github.com/chrislusf/glow/netchan/sender"
"github.com/chrislusf/glow/util"
)

type NetworkContext struct {
type NetworkOption struct {
AgentPort int
}

var networkContext NetworkContext
var networkOption NetworkOption

func init() {
flag.IntVar(&networkContext.AgentPort, "glow.agent.port", 8931, "agent port")
flag.IntVar(&networkOption.AgentPort, "glow.agent.port", 8931, "agent port")
}

func GetLocalSendChannel(name string, wg *sync.WaitGroup) (chan []byte, error) {
return sender.NewSendChannel(name, networkContext.AgentPort, wg)
return sender.NewSendChannel(name, networkOption.AgentPort, wg)
}

func GetLocalReadChannel(name string, chanBufferSize int) (chan []byte, error) {
return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkContext.AgentPort), chanBufferSize)
return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkOption.AgentPort), chanBufferSize)
}

func GetDirectReadChannel(name, location string, chanBufferSize int) (chan []byte, error) {
Expand All @@ -42,11 +43,12 @@ func GetDirectSendChannel(name string, target string, wg *sync.WaitGroup) (chan
return sender.NewDirectSendChannel(name, target, wg)
}

func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t reflect.Type, wg *sync.WaitGroup) chan reflect.Value {

func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t reflect.Type, wg *sync.WaitGroup) (status *util.ChannelStatus) {
status = util.NewChannelStatus()
wg.Add(1)
go func() {
defer wg.Done()
status.ReportStart()

for data := range c {
dec := gob.NewDecoder(bytes.NewBuffer(data))
Expand All @@ -55,20 +57,22 @@ func ConnectRawReadChannelToTyped(c chan []byte, out chan reflect.Value, t refle
log.Fatal("data type:", v.Kind(), " decode error:", err)
} else {
out <- reflect.Indirect(v)
status.ReportAdd(1)
}
}

close(out)
status.ReportClose()
}()

return out

return status
}

func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *sync.WaitGroup) {
func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *sync.WaitGroup) (status *util.ChannelStatus) {
status = util.NewChannelStatus()
wg.Add(1)
go func() {
defer wg.Done()
status.ReportStart()

var t reflect.Value
for ok := true; ok; {
Expand All @@ -79,39 +83,11 @@ func ConnectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *s
log.Fatal("data type:", t.Type().String(), " ", t.Kind(), " encode error:", err)
}
c <- buf.Bytes()
status.ReportAdd(1)
}
}
close(c)

}()

}

func MergeChannel(cs []chan reflect.Value) (out chan reflect.Value) {
out = make(chan reflect.Value)
MergeChannelTo(cs, nil, out)
return out
}

func MergeChannelTo(cs []chan reflect.Value, transformFn func(reflect.Value) reflect.Value, out chan reflect.Value) {
var wg sync.WaitGroup

for _, c := range cs {
wg.Add(1)
go func(c chan reflect.Value) {
defer wg.Done()
for n := range c {
if transformFn != nil {
n = transformFn(n)
}
out <- n
}
}(c)
}

go func() {
wg.Wait()
close(out)
status.ReportClose()
}()
return
return status
}
27 changes: 27 additions & 0 deletions util/channel_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"time"
)

type ChannelStatus struct {
Length int
StartTime time.Time
StopTime time.Time
}

func NewChannelStatus() *ChannelStatus {
return &ChannelStatus{}
}

func (s *ChannelStatus) ReportStart() {
s.StartTime = time.Now()
}

func (s *ChannelStatus) ReportAdd(delta int) {
s.Length += delta
}

func (s *ChannelStatus) ReportClose() {
s.StopTime = time.Now()
}
35 changes: 35 additions & 0 deletions util/channel_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"reflect"
"sync"
)

func MergeChannel(cs []chan reflect.Value) (out chan reflect.Value) {
out = make(chan reflect.Value)
MergeChannelTo(cs, nil, out)
return out
}

func MergeChannelTo(cs []chan reflect.Value, transformFn func(reflect.Value) reflect.Value, out chan reflect.Value) {
var wg sync.WaitGroup

for _, c := range cs {
wg.Add(1)
go func(c chan reflect.Value) {
defer wg.Done()
for n := range c {
if transformFn != nil {
n = transformFn(n)
}
out <- n
}
}(c)
}

go func() {
wg.Wait()
close(out)
}()
return
}

0 comments on commit f07aaaa

Please sign in to comment.