forked from chrislusf/glow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataset_reduce.go
122 lines (114 loc) · 3.2 KB
/
dataset_reduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package flow
import (
"reflect"
)
// Reduce runs on a dataset with data type V.
// Function f takes two arguments of type V and returns one type V.
// The function should be commutative and associative
// so that it can be computed correctly in parallel.
func (d *Dataset) Reduce(f interface{}) (ret *Dataset) {
return d.LocalReduce(f).MergeReduce(f)
}
func (d *Dataset) LocalReduce(f interface{}) *Dataset {
ret, step := add1ShardTo1Step(d, d.Type)
step.Name = "LocalReduce"
step.Function = func(task *Task) {
outChan := task.Outputs[0].WriteChan
hasValue := false
var localResult reflect.Value
fn := reflect.ValueOf(f)
for input := range task.InputChan() {
if !hasValue {
hasValue = true
localResult = input
} else {
outs := fn.Call([]reflect.Value{
localResult,
input,
})
localResult = outs[0]
}
}
if hasValue {
outChan.Send(localResult)
}
}
return ret
}
func (d *Dataset) MergeReduce(f interface{}) (ret *Dataset) {
ret = d.context.newNextDataset(1, d.Type)
step := d.context.AddAllToOneStep(d, ret)
step.Name = "MergeReduce"
step.Function = func(task *Task) {
outChan := task.Outputs[0].WriteChan
hasValue := false
var localResult reflect.Value
fn := reflect.ValueOf(f)
for input := range task.MergedInputChan() {
if !hasValue {
hasValue = true
localResult = input
} else {
outs := fn.Call([]reflect.Value{
localResult,
input,
})
localResult = outs[0]
}
}
if hasValue {
outChan.Send(localResult)
}
}
return ret
}
// ReduceByKey runs on a dataset with (K, V) pairs,
// returns a dataset with (K, V) pairs, where values for the same key
// are aggreated by function f.
// Function f takes two arguments of type V and returns one type V.
// The function should be commutative and associative
// so that it can be computed correctly in parallel.
func (d *Dataset) ReduceByKey(f interface{}) *Dataset {
return d.LocalSort(nil).LocalReduceByKey(f).MergeSorted(nil).LocalReduceByKey(f)
}
func (d *Dataset) ReduceByUserDefinedKey(lessThanFunc interface{}, reducer interface{}) *Dataset {
return d.LocalSort(lessThanFunc).LocalReduceByKey(reducer).MergeSorted(lessThanFunc).LocalReduceByKey(reducer)
}
func (d *Dataset) LocalReduceByKey(f interface{}) *Dataset {
ret, step := add1ShardTo1Step(d, d.Type)
step.Name = "LocalReduceByKey"
step.Function = func(task *Task) {
outChan := task.Outputs[0].WriteChan
foldSameKey(task.InputChan(), f, outChan)
}
return ret
}
func foldSameKey(inputs chan reflect.Value, f interface{}, outChan reflect.Value) {
var prevKey interface{}
fn := reflect.ValueOf(f)
var localResult reflect.Value
hasValue := false
for input := range inputs {
kv := input.Interface().(KeyValue)
if !hasValue {
hasValue = true
prevKey = kv.Key
localResult = reflect.ValueOf(kv.Value)
} else if !reflect.DeepEqual(prevKey, kv.Key) {
if localResult.IsValid() {
sendKeyValue(outChan, prevKey, localResult.Interface())
}
prevKey = kv.Key
localResult = reflect.ValueOf(kv.Value)
} else {
outs := fn.Call([]reflect.Value{
localResult,
reflect.ValueOf(kv.Value),
})
localResult = outs[0]
}
}
if hasValue {
sendKeyValue(outChan, prevKey, localResult.Interface())
}
}