forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiterable.go
68 lines (54 loc) · 1.26 KB
/
iterable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package rxgo
import "context"
// Iterable creates an iterator
type Iterable interface {
Iterator(ctx context.Context) Iterator
}
type iterableFromChannel struct {
ch chan interface{}
}
type iterableFromSlice struct {
s []interface{}
}
type iterableFromRange struct {
start int
count int
}
type iterableFromFunc struct {
f func(chan interface{})
}
func (it *iterableFromFunc) Iterator(ctx context.Context) Iterator {
out := make(chan interface{})
go it.f(out)
return newIteratorFromChannel(out)
}
func (it *iterableFromChannel) Iterator(ctx context.Context) Iterator {
return newIteratorFromChannel(it.ch)
}
func (it *iterableFromSlice) Iterator(ctx context.Context) Iterator {
return newIteratorFromSlice(it.s)
}
func (it *iterableFromRange) Iterator(ctx context.Context) Iterator {
return newIteratorFromRange(it.start-1, it.start+it.count)
}
func newIterableFromChannel(ch chan interface{}) Iterable {
return &iterableFromChannel{
ch: ch,
}
}
func newIterableFromSlice(s []interface{}) Iterable {
return &iterableFromSlice{
s: s,
}
}
func newIterableFromRange(start, count int) Iterable {
return &iterableFromRange{
start: start,
count: count,
}
}
func newIterableFromFunc(f func(chan interface{})) Iterable {
return &iterableFromFunc{
f: f,
}
}