forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsingle.go
151 lines (129 loc) · 3.16 KB
/
single.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package rxgo
import (
"context"
"github.com/pkg/errors"
)
// Single is similar to an Observable but emits only one single element or an error notification.
type Single interface {
Iterable
Filter(apply Predicate) OptionalSingle
Map(apply Function) Single
Subscribe(handler EventHandler, opts ...Option) Observer
}
// OptionalSingle represents an optional single observable type
type OptionalSingle interface {
Subscribe(handler EventHandler, opts ...Option) Observer
}
type single struct {
iterable Iterable
}
type optionalSingle struct {
itemChannel chan Optional
}
func newSingleFrom(item interface{}) Single {
f := func(out chan interface{}) {
out <- item
close(out)
}
return newColdSingle(f)
}
func newOptionalSingleFrom(opt Optional) OptionalSingle {
s := optionalSingle{
itemChannel: make(chan Optional),
}
go func() {
s.itemChannel <- opt
close(s.itemChannel)
}()
return &s
}
func newColdSingle(f func(chan interface{})) Single {
return &single{
iterable: newIterableFromFunc(f),
}
}
// NewOptionalSingleFromChannel creates a new OptionalSingle from a channel input
func NewOptionalSingleFromChannel(ch chan Optional) OptionalSingle {
return &optionalSingle{
itemChannel: ch,
}
}
func (s *single) Iterator(ctx context.Context) Iterator {
return s.iterable.Iterator(context.Background())
}
func (s *single) Filter(apply Predicate) OptionalSingle {
out := make(chan Optional)
go func() {
it := s.iterable.Iterator(context.Background())
if item, err := it.Next(context.Background()); err == nil {
if apply(item) {
out <- Of(item)
} else {
out <- EmptyOptional()
}
close(out)
return
}
}()
return &optionalSingle{
itemChannel: out,
}
}
func (s *single) Map(apply Function) Single {
f := func(out chan interface{}) {
it := s.iterable.Iterator(context.Background())
if item, err := it.Next(context.Background()); err == nil {
out <- apply(item)
close(out)
return
}
}
return newColdSingle(f)
}
func (s *single) Subscribe(handler EventHandler, opts ...Option) Observer {
ob := NewObserver(handler)
go func() {
it := s.iterable.Iterator(context.Background())
if item, err := it.Next(context.Background()); err == nil {
switch item := item.(type) {
case error:
err := ob.OnError(item)
if err != nil {
panic(errors.Wrap(err, "error while sending error item from single"))
}
default:
err := ob.OnNext(item)
if err != nil {
panic(errors.Wrap(err, "error while sending next item from single"))
}
ob.Dispose()
}
} else {
err := ob.OnDone()
if err != nil {
panic(errors.Wrap(err, "error while sending done signal from single"))
}
}
}()
return ob
}
func (s *optionalSingle) Subscribe(handler EventHandler, opts ...Option) Observer {
ob := NewObserver(handler)
go func() {
item := <-s.itemChannel
switch item := item.(type) {
case error:
err := ob.OnError(item)
if err != nil {
panic(errors.Wrap(err, "error while sending error item from optional single"))
}
default:
err := ob.OnNext(item)
if err != nil {
panic(errors.Wrap(err, "error while sending next item from optional single"))
}
ob.Dispose()
}
}()
return ob
}