forked from chrislusf/glow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dataset_join_utils.go
155 lines (141 loc) · 3.59 KB
/
dataset_join_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package flow
import (
"log"
"reflect"
)
func DefaultStringComparator(a, b string) int64 {
switch {
case a == b:
return 0
case a < b:
return -1
default:
return 1
}
}
func DefaultFloat64Comparator(a, b float64) int64 {
switch {
case a == b:
return 0
case a < b:
return -1
default:
return 1
}
}
func DefaultFloat32Comparator(a, b float32) int64 {
switch {
case a == b:
return 0
case a < b:
return -1
default:
return 1
}
}
func getComparator(dt reflect.Type) (funcPointer interface{}) {
switch dt.Kind() {
case reflect.Int:
funcPointer = func(a, b int) int64 { return int64(a - b) }
case reflect.Int8:
funcPointer = func(a, b int8) int64 { return int64(a - b) }
case reflect.Int16:
funcPointer = func(a, b int16) int64 { return int64(a - b) }
case reflect.Int32:
funcPointer = func(a, b int32) int64 { return int64(a - b) }
case reflect.Uint:
funcPointer = func(a, b uint) int64 { return int64(a - b) }
case reflect.Uint8:
funcPointer = func(a, b uint8) int64 { return int64(a - b) }
case reflect.Uint16:
funcPointer = func(a, b uint16) int64 { return int64(a - b) }
case reflect.Uint32:
funcPointer = func(a, b uint32) int64 { return int64(a - b) }
case reflect.Uint64:
funcPointer = func(a, b uint64) int64 { return int64(a - b) }
case reflect.Int64:
funcPointer = func(a, b int64) int64 { return a - b }
case reflect.Float32:
funcPointer = DefaultFloat32Comparator
case reflect.Float64:
funcPointer = DefaultFloat64Comparator
case reflect.String:
funcPointer = DefaultStringComparator
default:
log.Panicf("No default comparator for %s:%s", dt.String(), dt.Kind().String())
}
return
}
func getSameKeyValues(ch chan reflect.Value,
comparator func(a, b interface{}) int64,
theKey, firstValue interface{}, hasFirstValue bool) (
nextKey, nextValue interface{}, theValues []interface{}, hasValue bool) {
theValues = append(theValues, firstValue)
hasValue = hasFirstValue
for {
nextKey, nextValue, hasValue = getKeyValue(ch)
if hasValue && comparator(theKey, nextKey) == 0 {
theValues = append(theValues, nextValue)
} else {
return
}
}
return
}
func getKeyValue(ch chan reflect.Value) (key, value interface{}, ok bool) {
keyValue, hasValue := <-ch
if hasValue {
kv := keyValue.Interface().(KeyValue)
key = kv.Key
value = kv.Value
}
return key, value, hasValue
}
type valuesWithSameKey struct {
Key interface{}
Values []interface{}
}
// create a channel to aggregate values of the same key
// automatically close original sorted channel
func newChannelOfValuesWithSameKey(sortedChan chan reflect.Value, compareFunc interface{}) chan valuesWithSameKey {
outChan := make(chan valuesWithSameKey)
go func() {
defer close(outChan)
firstKey, firstValue, hasValue := getKeyValue(sortedChan)
if !hasValue {
return
}
if compareFunc == nil {
compareFunc = getComparator(reflect.TypeOf(firstKey))
}
fn := reflect.ValueOf(compareFunc)
comparator := func(a, b interface{}) int64 {
outs := fn.Call([]reflect.Value{
reflect.ValueOf(a),
reflect.ValueOf(b),
})
return outs[0].Int()
}
keyValues := valuesWithSameKey{
Key: firstKey,
Values: make([]interface{}, 0),
}
keyValues.Values = append(keyValues.Values, firstValue)
for {
nextKey, nextValue, nextHasValue := getKeyValue(sortedChan)
if !nextHasValue {
outChan <- keyValues
break
}
x := comparator(keyValues.Key, nextKey)
if x == 0 {
keyValues.Values = append(keyValues.Values, nextValue)
} else {
outChan <- keyValues
keyValues.Key = nextKey
keyValues.Values = []interface{}{nextValue}
}
}
}()
return outChan
}