forked from hallgren/eventsourcing
-
Notifications
You must be signed in to change notification settings - Fork 1
/
repository.go
112 lines (100 loc) · 3.33 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package eventsourcing
import (
"errors"
"reflect"
)
// EventStore interface expose the methods an event store must uphold
type EventStore interface {
Save(events []Event) error
Get(id string, aggregateType string, afterVersion Version) (EventIterator, error)
}
// SnapshotStore interface expose the methods an snapshot store must uphold
type SnapshotStore interface {
Get(id, typ string) (Snapshot, error)
Save(s Snapshot) error
}
// Aggregate interface to use the aggregate root specific methods
type Aggregate interface {
Root() *AggregateRoot
Transition(event Event)
}
// ErrSnapshotNotFound returns if snapshot not found
var ErrSnapshotNotFound = errors.New("snapshot not found")
// ErrAggregateNotFound returns if snapshot or event not found for aggregate
var ErrAggregateNotFound = errors.New("aggregate not found")
// Repository is the returned instance from the factory function
type Repository struct {
*EventStream
eventStore EventStore
snapshot *SnapshotHandler
}
// NewRepository factory function
func NewRepository(eventStore EventStore, snapshot *SnapshotHandler) *Repository {
return &Repository{
eventStore: eventStore,
snapshot: snapshot,
EventStream: NewEventStream(),
}
}
// Save an aggregates events
func (r *Repository) Save(aggregate Aggregate) error {
root := aggregate.Root()
// use underlaying event slice to set GlobalVersion
err := r.eventStore.Save(root.aggregateEvents)
if err != nil {
return err
}
// publish the saved events to subscribers
r.Update(*root, root.Events())
// update the internal aggregate state
root.update()
return nil
}
// SaveSnapshot saves the current state of the aggregate but only if it has no unsaved events
func (r *Repository) SaveSnapshot(aggregate Aggregate) error {
if r.snapshot == nil {
return errors.New("no snapshot store has been initialized")
}
return r.snapshot.Save(aggregate)
}
// Get fetches the aggregates event and build up the aggregate
// If there is a snapshot store try fetch a snapshot of the aggregate and fetch event after the
// version of the aggregate if any
func (r *Repository) Get(id string, aggregate Aggregate) error {
if reflect.ValueOf(aggregate).Kind() != reflect.Ptr {
return errors.New("aggregate needs to be a pointer")
}
// if there is a snapshot store try fetch aggregate snapshot
if r.snapshot != nil {
err := r.snapshot.Get(id, aggregate)
if err != nil && !errors.Is(err, ErrSnapshotNotFound) {
return err
}
}
root := aggregate.Root()
aggregateType := reflect.TypeOf(aggregate).Elem().Name()
// fetch events after the current version of the aggregate that could be fetched from the snapshot store
eventIterator, err := r.eventStore.Get(id, aggregateType, root.Version())
if err != nil && !errors.Is(err, ErrNoEvents) {
return err
} else if errors.Is(err, ErrNoEvents) && root.Version() == 0 {
// no events and no snapshot
return ErrAggregateNotFound
}
defer eventIterator.Close()
for {
event, err := eventIterator.Next()
if err != nil && !errors.Is(err, ErrNoMoreEvents) {
return err
} else if errors.Is(err, ErrNoMoreEvents) && root.Version() == 0 {
// no events and no snapshot (some eventstore will not return the error ErrNoEvent on Get())
return ErrAggregateNotFound
}
if err != nil {
break
}
// apply the event on the aggregate
root.BuildFromHistory(aggregate, []Event{event})
}
return nil
}