forked from hallgren/eventsourcing
-
Notifications
You must be signed in to change notification settings - Fork 1
/
eventstream.go
116 lines (105 loc) · 3.79 KB
/
eventstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package eventsourcing
import (
"fmt"
"reflect"
"sync"
)
// EventStream struct what handles event subscription
type EventStream struct {
// holds subscribers of aggregate types events
aggregateTypes map[string][]func(e Event)
// holds subscribers of specific aggregates (type and identifier)
specificAggregates map[string][]func(e Event)
// holds subscribers of specific events
specificEvents map[reflect.Type][]func(e Event)
// holds subscribers of all events
allEvents []func(e Event)
// makes sure events are delivered in order
publishLock sync.Mutex
}
// NewEventStream factory function
func NewEventStream() *EventStream {
return &EventStream{
aggregateTypes: make(map[string][]func(e Event)),
specificAggregates: make(map[string][]func(e Event)),
specificEvents: make(map[reflect.Type][]func(e Event)),
allEvents: []func(e Event){},
}
}
// Update calls the functions that are subscribing to event
func (e *EventStream) Update(events []Event) {
// the lock prevent other event updates get mixed with this update
e.publishLock.Lock()
for _, event := range events {
// call all functions that has registered for the specific event
t := reflect.TypeOf(event.Data)
if functions, ok := e.specificEvents[t]; ok {
for _, f := range functions {
f(event)
}
}
// call all functions that has registered for the aggregate type events
if functions, ok := e.aggregateTypes[event.AggregateType]; ok {
for _, f := range functions {
f(event)
}
}
// call all functions that has registered for the aggregate type and id events
ref := fmt.Sprintf("%s_%s", event.AggregateType, event.AggregateRootID)
if functions, ok := e.aggregateTypes[ref]; ok {
for _, f := range functions {
f(event)
}
}
// call all functions that has registered for all events
for _, f := range e.allEvents {
f(event)
}
}
e.publishLock.Unlock()
}
// SubscribeAll bind the f function to be called on all events independent on aggregate or event type
func (e *EventStream) SubscribeAll(f func(e Event)) {
e.allEvents = append(e.allEvents, f)
}
// SubscribeSpecificAggregate bind the f function to be called on events that belongs to aggregate based on type and id
func (e *EventStream) SubscribeSpecificAggregate(f func(e Event), aggregates ...aggregate) {
for _, a := range aggregates {
aggregateType := reflect.TypeOf(a).Elem().Name()
ref := fmt.Sprintf("%s_%s", aggregateType, a.id())
if e.aggregateTypes[ref] == nil {
// add the name and id of the aggregate and function to call to the empty register key
e.aggregateTypes[ref] = []func(e Event){f}
} else {
// adds one more function to the aggregate
e.aggregateTypes[ref] = append(e.aggregateTypes[ref], f)
}
}
}
// SubscribeAggregateTypes bind the f function to be called on events on the aggregate type
func (e *EventStream) SubscribeAggregateTypes(f func(e Event), aggregates ...aggregate) {
for _, a := range aggregates {
aggregateType := reflect.TypeOf(a).Elem().Name()
if e.aggregateTypes[aggregateType] == nil {
// add the name of the aggregate and function to call to the empty register key
e.aggregateTypes[aggregateType] = []func(e Event){f}
} else {
// adds one more function to the aggregate
e.aggregateTypes[aggregateType] = append(e.aggregateTypes[aggregateType], f)
}
}
}
// SubscribeSpecificEvent bind the f function to be called on specific events
func (e *EventStream) SubscribeSpecificEvent(f func(e Event), events ...interface{}) {
// subscribe to specified events
for _, event := range events {
t := reflect.TypeOf(event)
if e.specificEvents[t] == nil {
// add the event type and prop to the empty register key
e.specificEvents[t] = []func(e Event){f}
} else {
// adds one more property to the event type
e.specificEvents[t] = append(e.specificEvents[t], f)
}
}
}