forked from hallgren/eventsourcing
-
Notifications
You must be signed in to change notification settings - Fork 1
/
eventstream.go
241 lines (215 loc) · 6.48 KB
/
eventstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package eventsourcing
import (
"fmt"
"reflect"
"sync"
)
// EventStream struct that handles event subscription
type EventStream struct {
// makes sure events are delivered in order and subscriptions are persistent
lock sync.Mutex
// holds subscribers of aggregate types events
aggregateTypes map[string][]*subscription
// holds subscribers of specific aggregates (type and identifier)
specificAggregates map[string][]*subscription
// holds subscribers of specific events
specificEvents map[reflect.Type][]*subscription
// holds subscribers of all events
all []*subscription
// holds subscribers of aggregate and events by name
names map[string][]*subscription
}
// subscription holds the event function to be triggered when an event is triggering the subscription,
// it also hols a close function to end the subscription.
// event matches the subscription
type subscription struct {
eventF func(e Event)
close func()
}
// Close stops the subscription
func (s *subscription) Close() {
s.close()
}
// NewEventStream factory function
func NewEventStream() *EventStream {
return &EventStream{
aggregateTypes: make(map[string][]*subscription),
specificAggregates: make(map[string][]*subscription),
specificEvents: make(map[reflect.Type][]*subscription),
all: make([]*subscription, 0),
names: make(map[string][]*subscription),
}
}
// Publish calls the functions that are subscribing to the event stream
func (e *EventStream) Publish(agg AggregateRoot, events []Event) {
// the lock prevent other event updates get mixed with this update
e.lock.Lock()
defer e.lock.Unlock()
for _, event := range events {
e.allPublisher(event)
e.specificEventPublisher(event)
e.aggregateTypePublisher(agg, event)
e.specificAggregatesPublisher(agg, event)
e.namePublisher(event)
}
}
// call functions that has registered for all events
func (e *EventStream) allPublisher(event Event) {
publish(e.all, event)
}
// call functions that has registered for the specific event
func (e *EventStream) specificEventPublisher(event Event) {
ref := reflect.TypeOf(event.Data)
if subs, ok := e.specificEvents[ref]; ok {
publish(subs, event)
}
}
// call functions that has registered for the aggregate type events
func (e *EventStream) aggregateTypePublisher(agg AggregateRoot, event Event) {
ref := fmt.Sprintf("%s_%s", agg.path(), event.AggregateType)
if subs, ok := e.aggregateTypes[ref]; ok {
publish(subs, event)
}
}
// call functions that has registered for the aggregate type and ID events
func (e *EventStream) specificAggregatesPublisher(agg AggregateRoot, event Event) {
// ref also include the package name ensuring that Aggregate Types can have the same name.
ref := fmt.Sprintf("%s_%s_%s", agg.path(), event.AggregateType, agg.ID())
if subs, ok := e.specificAggregates[ref]; ok {
publish(subs, event)
}
}
// call functions that has registered for the aggregate type events
func (e *EventStream) namePublisher(event Event) {
ref := event.AggregateType + "_" + event.Reason()
if subs, ok := e.names[ref]; ok {
publish(subs, event)
}
}
// All subscribe to all events that is stored in the repository
func (e *EventStream) All(f func(e Event)) *subscription {
s := subscription{
eventF: f,
}
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()
s.eventF = nil
e.all = clean(e.all)
}
e.lock.Lock()
defer e.lock.Unlock()
e.all = append(e.all, &s)
return &s
}
// AggregateID subscribe to events that belongs to aggregate's based on its type and ID
func (e *EventStream) AggregateID(f func(e Event), aggregates ...Aggregate) *subscription {
s := subscription{
eventF: f,
}
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()
s.eventF = nil
// clean all evenF functions that are nil
for ref, items := range e.specificAggregates {
e.specificAggregates[ref] = clean(items)
}
}
e.lock.Lock()
defer e.lock.Unlock()
for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s_%s", root.path(), name, root.ID())
// adds one more function to the aggregate
e.specificAggregates[ref] = append(e.specificAggregates[ref], &s)
}
return &s
}
// Aggregate subscribe to events based on the aggregate type
func (e *EventStream) Aggregate(f func(e Event), aggregates ...Aggregate) *subscription {
s := subscription{
eventF: f,
}
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()
s.eventF = nil
// clean all evenF functions that are nil
for ref, items := range e.aggregateTypes {
e.aggregateTypes[ref] = clean(items)
}
}
e.lock.Lock()
defer e.lock.Unlock()
for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s", root.path(), name)
// adds one more function to the aggregate
e.aggregateTypes[ref] = append(e.aggregateTypes[ref], &s)
}
return &s
}
// Event subscribe on specific application defined events based on type referencing.
func (e *EventStream) Event(f func(e Event), events ...interface{}) *subscription {
s := subscription{
eventF: f,
}
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()
s.eventF = nil
// clean all evenF functions that are nil
for ref, items := range e.specificEvents {
e.specificEvents[ref] = clean(items)
}
}
e.lock.Lock()
defer e.lock.Unlock()
for _, event := range events {
ref := reflect.TypeOf(event)
// adds one more property to the event type
e.specificEvents[ref] = append(e.specificEvents[ref], &s)
}
return &s
}
// Name subscribe to aggregate name combined with event names. The Name subscriber makes it possible to subscribe to
// events event if the aggregate and event types are within the current application context.
func (e *EventStream) Name(f func(e Event), aggregate string, events ...string) *subscription {
s := subscription{
eventF: f,
}
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()
s.eventF = nil
// clean all evenF functions that are nil
for ref, items := range e.names {
e.names[ref] = clean(items)
}
}
e.lock.Lock()
defer e.lock.Unlock()
for _, event := range events {
ref := aggregate + "_" + event
e.names[ref] = append(e.names[ref], &s)
}
return &s
}
// removes subscriptions with event function equal to nil
func clean(items []*subscription) []*subscription {
for i, s := range items {
if s.eventF == nil {
items = append(items[:i], items[i+1:]...)
}
}
return items
}
// publish event to all subscribers
func publish(items []*subscription, e Event) {
for _, s := range items {
s.eventF(e)
}
}