Skip to content

Commit

Permalink
Merge pull request chrislusf#73 from justicezyx/rewrite_collect_output
Browse files Browse the repository at this point in the history
Remove the outputType parameter of collectOutput.
  • Loading branch information
justicezyx authored Aug 31, 2016
2 parents 3ec8daf + 6fbcc14 commit 0a47e84
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
6 changes: 3 additions & 3 deletions flow/dataset_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestMapSingleParameter(t *testing.T) {
}).Map(func(x, y int) int {
return x + y
})
got := collectOutput(dataset, reflect.TypeOf(int(1)))
got := collectOutput(dataset)

if want := []int{2, 4, 6}; !reflect.DeepEqual(got, want) {
t.Errorf("Got: %v want: %v", got, want)
Expand All @@ -29,7 +29,7 @@ func TestGroupByKeyMap(t *testing.T) {
return append([]int{key}, values...)
})

got := collectOutput(dataset, reflect.TypeOf([]int{}))
got := collectOutput(dataset)

if want := [][]int{{1, 2, 2}, {2, 4, 4}, {3, 6, 6}}; !reflect.DeepEqual(got, want) {
t.Errorf("Got %v want %v", got, want)
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestCoGroupMap(t *testing.T) {
}
})

got := collectOutput(cogroupResult, reflect.TypeOf(result{}))
got := collectOutput(cogroupResult)
want := []result{
{
key: 1,
Expand Down
12 changes: 6 additions & 6 deletions flow/dataset_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func (d *Dataset) SaveTextToFile(fname string) {
}

// collectOutput collects the output of d and returns them as a slice of the
// input type outputType.
// type specified in d.Type.
//
// collectOutput intends to be used in tests. The implementation does not optimize
// for performance or guarantee correctness under a wide range of use cases.
func collectOutput(d *Dataset, outputType reflect.Type) interface{} {
outChan := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, outputType), 0)
// Intends to be used in tests. The implementation does not optimize for performance
// or guarantee correctness under a wide range of use cases.
func collectOutput(d *Dataset) interface{} {
outChan := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, d.Type), 0)
d.AddOutput(outChan.Interface())

var wg sync.WaitGroup
wg.Add(1)

got := reflect.MakeSlice(reflect.SliceOf(outputType), 0, 1)
got := reflect.MakeSlice(reflect.SliceOf(d.Type), 0, 1)
go func() {
defer wg.Done()
for v, ok := outChan.Recv(); ok; v, ok = outChan.Recv() {
Expand Down
2 changes: 1 addition & 1 deletion flow/dataset_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestCollectOutput(t *testing.T) {
output <- "c"
}, 1)

got := collectOutput(dataset, reflect.TypeOf(""))
got := collectOutput(dataset)

if want := []string{"a", "b", "c"}; !reflect.DeepEqual(got, want) {
t.Errorf("Got %v want %v", got, want)
Expand Down

0 comments on commit 0a47e84

Please sign in to comment.