Skip to content

Commit

Permalink
working output channel for a dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Oct 14, 2015
1 parent 386639c commit e757a33
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 117 deletions.
6 changes: 3 additions & 3 deletions agent/agent_server_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {
ds = as.name2Store[name]

}
//register stream
go client.NewHeartBeater(as.Port, "localhost:8930").StartChannelHeartBeat(ds.killHeartBeater, name)

as.name2StoreLock.Unlock()

//register stream
go client.NewHeartBeater(as.Port, *as.Option.Leader).StartChannelHeartBeat(ds.killHeartBeater, name)

// println(name, "start writing.")

buf := make([]byte, 4)
Expand Down
23 changes: 19 additions & 4 deletions driver/context_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type DriverOption struct {
Rack string
PlotOutput bool
TaskMemoryMB int
FlowBid float64
}

func init() {
Expand All @@ -26,6 +27,7 @@ func init() {
flag.StringVar(&driverOption.DataCenter, "driver.dataCenter", "", "preferred data center name")
flag.StringVar(&driverOption.Rack, "driver.rack", "", "preferred rack name")
flag.IntVar(&driverOption.TaskMemoryMB, "driver.task.memoryMB", 64, "request one task memory size in MB")
flag.Float64Var(&driverOption.FlowBid, "driver.flow.bid", 100.0, "total bid price in a flow to compete for resources")
flag.BoolVar(&driverOption.PlotOutput, "driver.plot.flow", false, "print out task group flow in graphviz dot format")

flow.RegisterContextRunner(NewFlowContextDriver(&driverOption))
Expand All @@ -49,15 +51,15 @@ func (fcd *FlowContextDriver) IsDriverPlotMode() bool {

func (fcd *FlowContextDriver) Plot(fc *flow.FlowContext) {
taskGroups := scheduler.GroupTasks(fc)
scheduler.PlotGraph(taskGroups)
scheduler.PlotGraph(taskGroups, fc)
}

// driver runs on local, controlling all tasks
func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

taskGroups := scheduler.GroupTasks(fc)
if fcd.option.PlotOutput {
scheduler.PlotGraph(taskGroups)
scheduler.PlotGraph(taskGroups, fc)
return
}

Expand All @@ -73,23 +75,36 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {
TaskMemoryMB: fcd.option.TaskMemoryMB,
},
)
defer fcd.Cleanup(sched, fc, taskGroups)

go sched.EventLoop()

// schedule to run the steps
var wg sync.WaitGroup
for i, taskGroup := range taskGroups {
for _, taskGroup := range taskGroups {
wg.Add(1)
sched.EventChan <- scheduler.SubmitTaskGroup{
FlowContext: fc,
TaskGroup: taskGroup,
Bid: len(taskGroups) - i,
Bid: fcd.option.FlowBid / float64(len(taskGroups)),
WaitGroup: &wg,
}
}
go sched.Market.FetcherLoop()

wg.Wait()

for _, ds := range fc.Datasets {
if len(ds.OutputChans) > 0 {
for _, ch := range ds.OutputChans {
ch.Close()
}
}
}
}

func (fcd *FlowContextDriver) Cleanup(sched *scheduler.Scheduler, fc *flow.FlowContext, taskGroups []*scheduler.TaskGroup) {
var wg sync.WaitGroup
wg.Add(1)
sched.EventChan <- scheduler.ReleaseTaskGroupInputs{
FlowContext: fc,
Expand Down
6 changes: 0 additions & 6 deletions driver/network_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"flag"
"sync"

"github.com/chrislusf/glow/io/receiver"
"github.com/chrislusf/glow/io/sender"
)

Expand All @@ -23,8 +22,3 @@ func init() {
func GetSendChannel(name string, wg *sync.WaitGroup) (chan []byte, error) {
return sender.NewChannel(name, networkContext.AgentPort, wg)
}

func GetDirectReadChannel(name, location string) (chan []byte, error) {
rc := receiver.NewReceiveChannel(name, 0)
return rc.GetDirectChannel(location)
}
22 changes: 18 additions & 4 deletions driver/scheduler/group_plot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
)

type FlowGraph struct {
taskGroups []*TaskGroup
out *bytes.Buffer
flowContext *flow.FlowContext
taskGroups []*TaskGroup
out *bytes.Buffer
}

func PlotGraph(taskGroups []*TaskGroup) {
func PlotGraph(taskGroups []*TaskGroup, fc *flow.FlowContext) {
var buffer bytes.Buffer
fg := &FlowGraph{taskGroups, &buffer}
fg := &FlowGraph{fc, taskGroups, &buffer}
fg.plot()

fmt.Println(buffer.String())
Expand Down Expand Up @@ -74,6 +75,15 @@ func (fg *FlowGraph) plot() {
}
}

for _, ds := range fg.flowContext.Datasets {
if len(ds.OutputChans) > 0 {
fg.w(prefix).output(ds).println(" [shape=doublecircle];")
for _, dss := range ds.Shards {
fg.w(prefix).d(dss).w(" -> ").output(ds).println(";")
}
}
}

fg.w(prefix).println("center=true;")
fg.w(prefix).println("compound=true;")
fg.w(prefix).println("start [shape=Mdiamond];")
Expand Down Expand Up @@ -117,6 +127,10 @@ func (fg *FlowGraph) d(dss *flow.DatasetShard) *FlowGraph {
fg.w("d").i(dss.Parent.Id).w("_").i(dss.Id)
return fg
}
func (fg *FlowGraph) output(ds *flow.Dataset) *FlowGraph {
fg.w("output").i(ds.Id)
return fg
}

/*
subgraph cluster0 {
Expand Down
8 changes: 4 additions & 4 deletions driver/scheduler/market/cda_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Requirement interface{}

type Demand struct {
Requirement Requirement
Bid int
Bid float64
ReturnChan chan Supply
}

Expand All @@ -36,7 +36,7 @@ type Market struct {
Demands []Demand
Supplies []Supply
Lock sync.Mutex
ScoreFn func(Requirement, int, Object) float64
ScoreFn func(Requirement, float64, Object) float64
FetchFn func([]Demand)
hasDemands *sync.Cond
}
Expand All @@ -47,7 +47,7 @@ func NewMarket() *Market {
return m
}

func (m *Market) SetScoreFunction(scorer func(Requirement, int, Object) float64) *Market {
func (m *Market) SetScoreFunction(scorer func(Requirement, float64, Object) float64) *Market {
m.ScoreFn = scorer
return m
}
Expand All @@ -58,7 +58,7 @@ func (m *Market) SetFetchFunction(fn func([]Demand)) *Market {
}

// retChan should be a buffered channel
func (m *Market) AddDemand(r Requirement, bid int, retChan chan Supply) {
func (m *Market) AddDemand(r Requirement, bid float64, retChan chan Supply) {
m.Lock.Lock()
defer m.Lock.Unlock()

Expand Down
38 changes: 34 additions & 4 deletions driver/scheduler/scheduler_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import (
// "fmt"
"log"
"os"
"reflect"
"strconv"
"sync"

"github.com/chrislusf/glow/driver/scheduler/market"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/io"
"github.com/chrislusf/glow/resource"
)

type SubmitTaskGroup struct {
FlowContext *flow.FlowContext
TaskGroup *TaskGroup
Bid int
Bid float64
WaitGroup *sync.WaitGroup
}

Expand Down Expand Up @@ -61,17 +63,44 @@ func (s *Scheduler) EventLoop() {
allocation := supply.Object.(resource.Allocation)

// remember dataset location
for _, ds := range tasks[len(tasks)-1].Outputs {
name := ds.Name()
for _, dss := range tasks[len(tasks)-1].Outputs {
name := dss.Name()
location := allocation.Location
s.datasetShard2LocationLock.Lock()
s.datasetShard2Location[name] = location
waitForAllInputs.Broadcast()
s.datasetShard2LocationLock.Unlock()
}

// fmt.Printf("allocated %s on %v\n", tasks[0].Name(), allocation.Location)
// setup output channel
for _, shard := range tasks[len(tasks)-1].Outputs {
ds := shard.Parent
if len(ds.OutputChans) == 0 {
continue
}
// connect remote raw ran to local typed chan
readChanName := shard.Name()
location := s.datasetShard2Location[readChanName]
rawChan, err := io.GetDirectReadChannel(readChanName, location.URL())
if err != nil {
log.Panic(err)
}
for _, out := range ds.OutputChans {
ch := make(chan reflect.Value)
io.ConnectRawReadChannelToTyped(rawChan, ch, ds.Type, event.WaitGroup)
event.WaitGroup.Add(1)
go func() {
defer event.WaitGroup.Done()
for v := range ch {
v = io.CleanObject(v, ds.Type, out.Type().Elem())
out.Send(v)
}
}()
}
}

// fmt.Printf("allocated %s on %v\n", tasks[0].Name(), allocation.Location)
// create reqeust
dir, _ := os.Getwd()
args := []string{
"-glow.context.id",
Expand All @@ -89,6 +118,7 @@ func (s *Scheduler) EventLoop() {
args = append(args, arg)
}
request := NewStartRequest(os.Args[0], dir, args, allocation.Allocated)

// fmt.Printf("starting on %s: %v\n", allocation.Allocated, request)
if err := RemoteDirectExecute(allocation.Location.URL(), request); err != nil {
log.Printf("exeuction error %v: %v", err, request)
Expand Down
2 changes: 1 addition & 1 deletion driver/scheduler/scheduler_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/chrislusf/glow/resource"
)

func (s *Scheduler) Score(r market.Requirement, bid int, obj market.Object) float64 {
func (s *Scheduler) Score(r market.Requirement, bid float64, obj market.Object) float64 {
tg, loc := r.(*TaskGroup), obj.(resource.Allocation).Location
firstTask := tg.Tasks[0]
cost := float64(1)
Expand Down
58 changes: 5 additions & 53 deletions driver/task_runner.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package driver

import (
"bytes"
"encoding/gob"
"flag"
"log"
"reflect"
"strings"
"sync"

"github.com/chrislusf/glow/driver/scheduler"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/io"
)

type TaskOption struct {
Expand Down Expand Up @@ -62,6 +60,7 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) {
tr.connectInputsAndOutputs(&wg)
// 6. starts to run the task locally
for _, task := range tr.Tasks {
// println("run task", task.Name())
wg.Add(1)
go func(task *flow.Task) {
defer wg.Done()
Expand Down Expand Up @@ -117,11 +116,11 @@ func (tr *TaskRunner) connectExternalInputs(wg *sync.WaitGroup, name2Location ma
d := shard.Parent
readChanName := shard.Name()
// println("taskGroup", tr.option.TaskGroupId, "task", task.Name(), "trying to read from:", readChanName, len(task.InputChans))
rawChan, err := GetDirectReadChannel(readChanName, name2Location[readChanName])
rawChan, err := io.GetDirectReadChannel(readChanName, name2Location[readChanName])
if err != nil {
log.Panic(err)
}
task.InputChans[i] = rawReadChannelToTyped(rawChan, d.Type, wg)
io.ConnectRawReadChannelToTyped(rawChan, task.InputChans[i], d.Type, wg)
}
}

Expand All @@ -134,53 +133,6 @@ func (tr *TaskRunner) connectExternalOutputs(wg *sync.WaitGroup) {
if err != nil {
log.Panic(err)
}
connectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg)
io.ConnectTypedWriteChannelToRaw(shard.WriteChan, rawChan, wg)
}
}

func rawReadChannelToTyped(c chan []byte, t reflect.Type, wg *sync.WaitGroup) chan reflect.Value {

out := make(chan reflect.Value)

wg.Add(1)
go func() {
defer wg.Done()

for data := range c {
dec := gob.NewDecoder(bytes.NewBuffer(data))
v := reflect.New(t)
if err := dec.DecodeValue(v); err != nil {
log.Fatal("data type:", v.Kind(), " decode error:", err)
} else {
out <- reflect.Indirect(v)
}
}

close(out)
}()

return out

}

func connectTypedWriteChannelToRaw(writeChan reflect.Value, c chan []byte, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()

var t reflect.Value
for ok := true; ok; {
if t, ok = writeChan.Recv(); ok {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.EncodeValue(t); err != nil {
log.Fatal("data type:", t.Type().String(), " ", t.Kind(), " encode error:", err)
}
c <- buf.Bytes()
}
}
close(c)

}()

}
3 changes: 3 additions & 0 deletions flow/context_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type TaskRunner interface {

func Ready() {
if taskRunner.IsTaskMode() {
for _, fc := range Contexts {
fc.Run()
}
os.Exit(0)
} else if contextRunner.IsDriverMode() {
if contextRunner.IsDriverPlotMode() {
Expand Down
Loading

0 comments on commit e757a33

Please sign in to comment.