forked from hallgren/eventsourcing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrepository.go
113 lines (99 loc) · 3.25 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package eventsourcing
import (
"errors"
"github.com/hallgren/eventsourcing/snapshotstore"
"reflect"
)
// eventStore interface expose the methods an event store must uphold
type eventStore interface {
Save(events []Event) error
Get(id string, aggregateType string, afterVersion Version) ([]Event, error)
}
// snapshotStore interface expose the methods an snapshot store must uphold
type snapshotStore interface {
Get(id string, a interface{}) error
Save(id string, a interface{}) error
}
// aggregate interface to use the aggregate root specific methods
type aggregate interface {
id() string
BuildFromHistory(a aggregate, events []Event)
Transition(event Event)
changes() []Event
updateVersion()
version() Version
}
// Repository is the returned instance from the factory function
type Repository struct {
eventStore eventStore
snapshotStore snapshotStore
eventStream *EventStream
}
// NewRepository factory function
func NewRepository(eventStore eventStore, snapshotStore snapshotStore) *Repository {
return &Repository{
eventStore: eventStore,
snapshotStore: snapshotStore,
eventStream: NewEventStream(),
}
}
// Save an aggregates events
func (r *Repository) Save(aggregate aggregate) error {
err := r.eventStore.Save(aggregate.changes())
if err != nil {
return err
}
// publish the saved events to subscribers
events := aggregate.changes()
r.eventStream.Update(events)
// aggregate are saved to the event store now its safe to update the internal aggregate state
aggregate.updateVersion()
return nil
}
// SaveSnapshot saves the current state of the aggregate but only if it has no unsaved events
func (r *Repository) SaveSnapshot(aggregate aggregate) error {
if r.snapshotStore == nil {
return errors.New("no snapshot store has been initialized in the repository")
}
if len(aggregate.changes()) > 0 {
return errors.New("can't save snapshot with unsaved events")
}
err := r.snapshotStore.Save(aggregate.id(), aggregate)
if err != nil {
return err
}
return nil
}
// Get fetches the aggregates event and build up the aggregate
// If there is a snapshot store try fetch a snapshot of the aggregate and fetch event after the
// version of the aggregate if any
func (r *Repository) Get(id string, aggregate aggregate) error {
if reflect.ValueOf(aggregate).Kind() != reflect.Ptr {
return errors.New("aggregate needs to be a pointer")
}
aggregateType := reflect.TypeOf(aggregate).Elem().Name()
// if there is a snapshot store try fetch aggregate snapshot
if r.snapshotStore != nil {
err := r.snapshotStore.Get(id, aggregate)
if err != nil && err != snapshotstore.ErrSnapshotNotFound {
return err
}
}
// fetch events after the current version of the aggregate that could be fetched from the snapshot store
events, err := r.eventStore.Get(id, aggregateType, aggregate.version())
if err != nil {
return err
}
// apply the event on the aggregate
aggregate.BuildFromHistory(aggregate, events)
return nil
}
// Subscribe binds the input func f to be called when events in the supplied slice is saved.
// If the list is empty the function will be called for all events
func (r *Repository) Subscribe(f func(e Event), events ...interface{}) {
if events == nil {
r.eventStream.Subscribe(f)
return
}
r.eventStream.Subscribe(f, events...)
}