Skip to content

Commit

Permalink
add optional channel buffer size
Browse files Browse the repository at this point in the history
This may need more thinking.
  • Loading branch information
chrislusf committed Nov 2, 2015
1 parent 18e77e7 commit 0b1b4af
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion driver/scheduler/scheduler_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *Scheduler) setupOutputChannels(shards []*flow.DatasetShard, waitGroup *
// connect remote raw chan to local typed chan
readChanName := shard.Name()
location := s.datasetShard2Location[readChanName]
rawChan, err := netchan.GetDirectReadChannel(readChanName, location.URL())
rawChan, err := netchan.GetDirectReadChannel(readChanName, location.URL(), 1024)
if err != nil {
log.Panic(err)
}
Expand Down
10 changes: 6 additions & 4 deletions driver/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func init() {
}

type TaskRunner struct {
option *TaskOption
Tasks []*flow.Task
option *TaskOption
Tasks []*flow.Task
FlowContext *flow.FlowContext
}

func NewTaskRunner(option *TaskOption) *TaskRunner {
Expand All @@ -53,6 +54,7 @@ func (tr *TaskRunner) Run(fc *flow.FlowContext) {
taskGroups := plan.GroupTasks(fc)

tr.Tasks = taskGroups[tr.option.TaskGroupId].Tasks
tr.FlowContext = fc

if len(tr.Tasks) == 0 {
log.Println("How can the task group has no tasks!")
Expand Down Expand Up @@ -122,7 +124,7 @@ func (tr *TaskRunner) connectExternalInputs(wg *sync.WaitGroup, name2Location ma
d := shard.Parent
readChanName := shard.Name()
// println("taskGroup", tr.option.TaskGroupId, "task", task.Name(), "trying to read from:", readChanName, len(task.InputChans))
rawChan, err := netchan.GetDirectReadChannel(readChanName, name2Location[readChanName])
rawChan, err := netchan.GetDirectReadChannel(readChanName, name2Location[readChanName], tr.FlowContext.ChannelBufferSize)
if err != nil {
log.Panic(err)
}
Expand All @@ -139,7 +141,7 @@ func (tr *TaskRunner) connectExternalInputChannels(wg *sync.WaitGroup) {
ds := firstTask.Outputs[0].Parent
for i, _ := range ds.ExternalInputChans {
inputChanName := fmt.Sprintf("ct-%d-input-%d-p-%d", tr.option.ContextId, ds.Id, i)
rawChan, err := netchan.GetLocalReadChannel(inputChanName)
rawChan, err := netchan.GetLocalReadChannel(inputChanName, tr.FlowContext.ChannelBufferSize)
if err != nil {
log.Panic(err)
}
Expand Down
7 changes: 4 additions & 3 deletions flow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
var Contexts []*FlowContext

type FlowContext struct {
Id int
Steps []*Step
Datasets []*Dataset
Id int
Steps []*Step
Datasets []*Dataset
ChannelBufferSize int
}

func New() (fc *FlowContext) {
Expand Down
2 changes: 1 addition & 1 deletion glow.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
case receiver.FullCommand():
target := r.FindTarget(*receiveFromChanName, *receiverMaster)
rc := r.NewReceiveChannel(*receiveFromChanName, 0)
recvChan, err := rc.GetDirectChannel(target)
recvChan, err := rc.GetDirectChannel(target, 128)
if err != nil {
panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions netchan/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func GetLocalSendChannel(name string, wg *sync.WaitGroup) (chan []byte, error) {
return sender.NewSendChannel(name, networkContext.AgentPort, wg)
}

func GetLocalReadChannel(name string) (chan []byte, error) {
return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkContext.AgentPort))
func GetLocalReadChannel(name string, chanBufferSize int) (chan []byte, error) {
return GetDirectReadChannel(name, "localhost:"+strconv.Itoa(networkContext.AgentPort), chanBufferSize)
}

func GetDirectReadChannel(name, location string) (chan []byte, error) {
func GetDirectReadChannel(name, location string, chanBufferSize int) (chan []byte, error) {
rc := receiver.NewReceiveChannel(name, 0)
return rc.GetDirectChannel(location)
return rc.GetDirectChannel(location, chanBufferSize)
}

func GetDirectSendChannel(name string, target string, wg *sync.WaitGroup) (chan []byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions netchan/receiver/receiver_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func NewReceiveChannel(name string, offset uint64) *ReceiveChannel {
}

// Not thread safe
func (rc *ReceiveChannel) GetDirectChannel(target string) (chan []byte, error) {
func (rc *ReceiveChannel) GetDirectChannel(target string, chanBufferSize int) (chan []byte, error) {
if rc.Ch != nil {
return rc.Ch, nil
}
rc.Ch = make(chan []byte)
rc.Ch = make(chan []byte, chanBufferSize)
go func() {
rc.receiveTopicFrom(target)
}()
Expand Down

0 comments on commit 0b1b4af

Please sign in to comment.