Skip to content

Commit

Permalink
Add comments and revise an existing one.
Browse files Browse the repository at this point in the history
  • Loading branch information
justicezyx authored Jul 12, 2016
1 parent ec51e71 commit efb429d
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions flow/dataset_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"sync"
)

// Inputs: f(chan A), shardCount
// Source returns a new Dataset which evenly distributes the data items produced by f
// among multiple shards. f must be a function defined in the form func(chan <some_type>).
func (fc *FlowContext) Source(f interface{}, shard int) (ret *Dataset) {
ret = fc.newNextDataset(shard, guessFunctionOutputType(f))
step := fc.AddOneToAllStep(nil, ret)
Expand Down Expand Up @@ -48,6 +49,8 @@ func (fc *FlowContext) Source(f interface{}, shard int) (ret *Dataset) {
return
}

// TextFile returns a new Dataset which reads the text file fname line by line,
// and distributes them evenly among multiple shards.
func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset) {
fn := func(out chan string) {
file, err := os.Open(fname)
Expand All @@ -70,6 +73,8 @@ func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset) {
return fc.Source(fn, shard)
}

// Channel returns a new Dataset which has the input channel as the input and sends the received
// values to tasks.
func (fc *FlowContext) Channel(ch interface{}) (ret *Dataset) {
chValue, chType := reflect.ValueOf(ch), reflect.TypeOf(ch)

Expand All @@ -89,7 +94,7 @@ func (fc *FlowContext) doChannel(chValue reflect.Value, chType reflect.Type) (re
return
}

// Slice accepts a slice and send values to tasks via Channel()
// Slice accepts a slice as the input and sends the received values to tasks via Channel().
func (fc *FlowContext) Slice(slice interface{}) (ret *Dataset) {
sliceValue, sliceType := reflect.ValueOf(slice), reflect.TypeOf(slice)
sliceLen := sliceValue.Len()
Expand Down

0 comments on commit efb429d

Please sign in to comment.