From 6fbcc14686fa80203124732e755c28289927db55 Mon Sep 17 00:00:00 2001 From: Yaxiong Zhao Date: Tue, 30 Aug 2016 20:25:51 -0700 Subject: [PATCH] Remove the outputType parameter of collectOutput. That information can be obtained from DataSet.Type. --- flow/dataset_map_test.go | 6 +++--- flow/dataset_output.go | 12 ++++++------ flow/dataset_output_test.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flow/dataset_map_test.go b/flow/dataset_map_test.go index 05d2734..0a981bf 100644 --- a/flow/dataset_map_test.go +++ b/flow/dataset_map_test.go @@ -13,7 +13,7 @@ func TestMapSingleParameter(t *testing.T) { }).Map(func(x, y int) int { return x + y }) - got := collectOutput(dataset, reflect.TypeOf(int(1))) + got := collectOutput(dataset) if want := []int{2, 4, 6}; !reflect.DeepEqual(got, want) { t.Errorf("Got: %v want: %v", got, want) @@ -29,7 +29,7 @@ func TestGroupByKeyMap(t *testing.T) { return append([]int{key}, values...) }) - got := collectOutput(dataset, reflect.TypeOf([]int{})) + got := collectOutput(dataset) if want := [][]int{{1, 2, 2}, {2, 4, 4}, {3, 6, 6}}; !reflect.DeepEqual(got, want) { t.Errorf("Got %v want %v", got, want) @@ -63,7 +63,7 @@ func TestCoGroupMap(t *testing.T) { } }) - got := collectOutput(cogroupResult, reflect.TypeOf(result{})) + got := collectOutput(cogroupResult) want := []result{ { key: 1, diff --git a/flow/dataset_output.go b/flow/dataset_output.go index a4a89c5..254fb3e 100644 --- a/flow/dataset_output.go +++ b/flow/dataset_output.go @@ -85,18 +85,18 @@ func (d *Dataset) SaveTextToFile(fname string) { } // collectOutput collects the output of d and returns them as a slice of the -// input type outputType. +// type specified in d.Type. // -// collectOutput intends to be used in tests. The implementation does not optimize -// for performance or guarantee correctness under a wide range of use cases. -func collectOutput(d *Dataset, outputType reflect.Type) interface{} { - outChan := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, outputType), 0) +// Intends to be used in tests. The implementation does not optimize for performance +// or guarantee correctness under a wide range of use cases. +func collectOutput(d *Dataset) interface{} { + outChan := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, d.Type), 0) d.AddOutput(outChan.Interface()) var wg sync.WaitGroup wg.Add(1) - got := reflect.MakeSlice(reflect.SliceOf(outputType), 0, 1) + got := reflect.MakeSlice(reflect.SliceOf(d.Type), 0, 1) go func() { defer wg.Done() for v, ok := outChan.Recv(); ok; v, ok = outChan.Recv() { diff --git a/flow/dataset_output_test.go b/flow/dataset_output_test.go index f60237a..704dbeb 100644 --- a/flow/dataset_output_test.go +++ b/flow/dataset_output_test.go @@ -13,7 +13,7 @@ func TestCollectOutput(t *testing.T) { output <- "c" }, 1) - got := collectOutput(dataset, reflect.TypeOf("")) + got := collectOutput(dataset) if want := []string{"a", "b", "c"}; !reflect.DeepEqual(got, want) { t.Errorf("Got %v want %v", got, want)