Skip to content

Commit

Permalink
stop export aggregate root properties directly
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgren committed Dec 19, 2020
1 parent 8b8da72 commit 2aaf914
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 87 deletions.
67 changes: 34 additions & 33 deletions eventsourcing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ type Version int

// AggregateRoot to be included into aggregates
type AggregateRoot struct {
AggregateID string
AggregateVersion Version
AggregateEvents []Event
aggregateID string
aggregateVersion Version
aggregateEvents []Event
}

// Event holding meta data and the application specific event in the Data property
Expand All @@ -30,8 +30,8 @@ type Event struct {
}

var (
// ErrAggregateAlreadyExists returned if the AggregateID is set more than one time
ErrAggregateAlreadyExists = errors.New("its not possible to set id on already existing aggregate")
// ErrAggregateAlreadyExists returned if the aggregateID is set more than one time
ErrAggregateAlreadyExists = errors.New("its not possible to set ID on already existing aggregate")

emptyAggregateID = ""
)
Expand All @@ -47,76 +47,72 @@ func (state *AggregateRoot) TrackChange(a aggregate, data interface{}) {
// meta data is handled by this func to store none related application state
func (state *AggregateRoot) TrackChangeWithMetaData(a aggregate, data interface{}, metaData map[string]interface{}) {
// This can be overwritten in the constructor of the aggregate
if state.AggregateID == emptyAggregateID {
if state.aggregateID == emptyAggregateID {
state.setID(uuid.New().String())
}

reason := reflect.TypeOf(data).Elem().Name()
name := reflect.TypeOf(a).Elem().Name()
event := Event{
AggregateRootID: state.AggregateID,
AggregateRootID: state.aggregateID,
Version: state.nextVersion(),
Reason: reason,
AggregateType: name,
Timestamp: time.Now().UTC(),
Data: data,
MetaData: metaData,
}
state.AggregateEvents = append(state.AggregateEvents, event)
state.aggregateEvents = append(state.aggregateEvents, event)
a.Transition(event)
}

// BuildFromHistory builds the aggregate state from events
func (state *AggregateRoot) BuildFromHistory(a aggregate, events []Event) {
for _, event := range events {
a.Transition(event)
//Set the aggregate id
state.AggregateID = event.AggregateRootID
//Set the aggregate ID
state.aggregateID = event.AggregateRootID
// Make sure the aggregate is in the correct version (the last event)
state.AggregateVersion = event.Version
state.aggregateVersion = event.Version
}
}

func (state *AggregateRoot) nextVersion() Version {
return state.CurrentVersion() + 1
return state.Version() + 1
}

// updateVersion sets the AggregateVersion to the AggregateVersion in the last event if reset the events
// updateVersion sets the aggregateVersion to the aggregateVersion in the last event if reset the events
// called by the Save func in the repository after the events are stored
func (state *AggregateRoot) updateVersion() {
if len(state.AggregateEvents) > 0 {
state.AggregateVersion = state.AggregateEvents[len(state.AggregateEvents)-1].Version
state.AggregateEvents = []Event{}
if len(state.aggregateEvents) > 0 {
state.aggregateVersion = state.aggregateEvents[len(state.aggregateEvents)-1].Version
state.aggregateEvents = []Event{}
}
}

func (state *AggregateRoot) changes() []Event {
return state.AggregateEvents
return state.aggregateEvents
}

// setID is the internal method to set the aggregate id
// setID is the internal method to set the aggregate ID
func (state *AggregateRoot) setID(id string) {
state.AggregateID = id
}

func (state *AggregateRoot) version() Version {
return state.AggregateVersion
state.aggregateID = id
}

//Public accessors for aggregate root properties

// SetID opens up the possibility to set manual aggregate id from the outside
// SetID opens up the possibility to set manual aggregate ID from the outside
func (state *AggregateRoot) SetID(id string) error {
if state.AggregateID != emptyAggregateID {
if state.aggregateID != emptyAggregateID {
return ErrAggregateAlreadyExists
}
state.setID(id)
return nil
}

// id returns the aggregate id as a string
func (state *AggregateRoot) id() string {
return state.AggregateID
// ID returns the aggregate ID as a string
func (state *AggregateRoot) ID() string {
return state.aggregateID
}

// path return the full name of the aggregate making it unique to other aggregates with
Expand All @@ -125,10 +121,15 @@ func (state *AggregateRoot) path() string {
return reflect.TypeOf(state).Elem().PkgPath()
}

// CurrentVersion return the version based on events that are not stored
func (state *AggregateRoot) CurrentVersion() Version {
if len(state.AggregateEvents) > 0 {
return state.AggregateEvents[len(state.AggregateEvents)-1].Version
// Version return the version based on events that are not stored
func (state *AggregateRoot) Version() Version {
if len(state.aggregateEvents) > 0 {
return state.aggregateEvents[len(state.aggregateEvents)-1].Version
}
return state.AggregateVersion
return state.aggregateVersion
}

// Events return the aggregate events from the aggregate
func (state *AggregateRoot) Events() []Event {
return state.aggregateEvents
}
30 changes: 15 additions & 15 deletions eventsourcing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func CreatePerson(name string) (*Person, error) {
return &person, nil
}

// CreatePersonWithID constructor for the Person that sets the aggregate id from the outside
// CreatePersonWithID constructor for the Person that sets the aggregate ID from the outside
func CreatePersonWithID(id, name string) (*Person, error) {
if name == "" {
return nil, errors.New("name can't be blank")
Expand Down Expand Up @@ -84,19 +84,19 @@ func TestCreateNewPerson(t *testing.T) {
t.Fatal("Wrong person Age")
}

if len(person.AggregateEvents) != 1 {
if len(person.Events()) != 1 {
t.Fatal("There should be one event on the person aggregateRoot")
}

if person.CurrentVersion() != 1 {
t.Fatal("Wrong version on the person aggregateRoot", person.AggregateVersion)
if person.Version() != 1 {
t.Fatal("Wrong version on the person aggregateRoot", person.Version())
}

if person.AggregateEvents[0].Timestamp.Before(timeBefore) {
if person.Events()[0].Timestamp.Before(timeBefore) {
t.Fatal("event timestamp before timeBefore")
}

if person.AggregateEvents[0].Timestamp.After(time.Now().UTC()) {
if person.Events()[0].Timestamp.After(time.Now().UTC()) {
t.Fatal("event timestamp after current time")
}
}
Expand All @@ -108,8 +108,8 @@ func TestCreateNewPersonWithIDFromOutside(t *testing.T) {
t.Fatal("Error when creating person", err.Error())
}

if string(person.AggregateID) != id {
t.Fatal("Wrong aggregate id on the person aggregateRoot", person.AggregateID)
if person.ID() != id {
t.Fatal("Wrong aggregate ID on the person aggregateRoot", person.ID())
}
}

Expand All @@ -128,23 +128,23 @@ func TestSetIDOnExistingPerson(t *testing.T) {

err = person.SetID("new_id")
if err == nil {
t.Fatal("Should not be possible to set id on already existing person")
t.Fatal("Should not be possible to set ID on already existing person")
}
}

func TestPersonAgedOneYear(t *testing.T) {
person, _ := CreatePerson("kalle")
person.GrowOlder()

if len(person.AggregateEvents) != 2 {
t.Fatal("There should be two event on the person aggregateRoot", person.AggregateEvents)
if len(person.Events()) != 2 {
t.Fatal("There should be two event on the person aggregateRoot", person.Events())
}

if person.AggregateEvents[len(person.AggregateEvents)-1].Reason != "AgedOneYear" {
t.Fatal("The last event reason should be AgedOneYear", person.AggregateEvents[len(person.AggregateEvents)-1].Reason)
if person.Events()[len(person.Events())-1].Reason != "AgedOneYear" {
t.Fatal("The last event reason should be AgedOneYear", person.Events()[len(person.Events())-1].Reason)
}

d, ok := person.AggregateEvents[1].MetaData["foo"]
d, ok := person.Events()[1].MetaData["foo"]

if !ok {
t.Fatal("meta data not present")
Expand All @@ -154,7 +154,7 @@ func TestPersonAgedOneYear(t *testing.T) {
t.Fatal("wrong meta data")
}

if person.AggregateID == "" {
if person.ID() == "" {
t.Fatal("aggregate ID should not be empty")
}
}
Expand Down
1 change: 0 additions & 1 deletion eventstore/bbolt/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/hallgren/eventsourcing/eventstore/bbolt
go 1.13

require (
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/hallgren/eventsourcing v0.0.10
go.etcd.io/bbolt v1.3.4
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 // indirect
Expand Down
1 change: 0 additions & 1 deletion eventstore/sql/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/hallgren/eventsourcing/eventstore/sql
go 1.13

require (
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/hallgren/eventsourcing v0.0.10
github.com/proullon/ramsql v0.0.0-20181213202341-817cee58a244
)
10 changes: 5 additions & 5 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (e *EventStream) Update(agg aggregate, events []Event) {
}
}

// call all functions that has registered for the aggregate type and id events
// call all functions that has registered for the aggregate type and ID events
// ref also include the package name ensuring that Aggregate Types can have the same name.
ref = fmt.Sprintf("%s_%s", ref, agg.id())
ref = fmt.Sprintf("%s_%s", ref, agg.ID())
if subs, ok := e.specificAggregates[ref]; ok {
for _, s := range subs {
s.f(event)
Expand Down Expand Up @@ -112,7 +112,7 @@ func (e *EventStream) SubscriberAll(f func(e Event)) *Subscription {
return &s
}

// SubscriberSpecificAggregate bind the f function to be called on events that belongs to aggregate based on type and id
// SubscriberSpecificAggregate bind the f function to be called on events that belongs to aggregate based on type and ID
func (e *EventStream) SubscriberSpecificAggregate(f func(e Event), aggregates ...aggregate) *Subscription {
s := Subscription{
f: f,
Expand All @@ -123,7 +123,7 @@ func (e *EventStream) SubscriberSpecificAggregate(f func(e Event), aggregates ..

for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
ref := fmt.Sprintf("%s_%s_%s", a.path(), name, a.id())
ref := fmt.Sprintf("%s_%s_%s", a.path(), name, a.ID())
for i, sub := range e.specificAggregates[ref] {
if &s == sub {
e.specificAggregates[ref] = append(e.specificAggregates[ref][:i], e.specificAggregates[ref][i+1:]...)
Expand All @@ -138,7 +138,7 @@ func (e *EventStream) SubscriberSpecificAggregate(f func(e Event), aggregates ..

for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
ref := fmt.Sprintf("%s_%s_%s", a.path(), name, a.id())
ref := fmt.Sprintf("%s_%s_%s", a.path(), name, a.ID())

// adds one more function to the aggregate
e.specificAggregates[ref] = append(e.specificAggregates[ref], &s)
Expand Down
7 changes: 5 additions & 2 deletions eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ func TestSubscribeOneEvent(t *testing.T) {

func TestSubscribeSpecificAggregate(t *testing.T) {
// setup aggregates with identifiers
anAggregate := AnAggregate{eventsourcing.AggregateRoot{AggregateID: "123"}}
anOtherAggregate := AnotherAggregate{eventsourcing.AggregateRoot{AggregateID: "456"}}

anAggregate := AnAggregate{}
anAggregate.SetID("123")
anOtherAggregate := AnotherAggregate{}
anOtherAggregate.SetID("456")

var streamEvent *eventsourcing.Event
e := eventsourcing.NewEventStream()
Expand Down
4 changes: 2 additions & 2 deletions example/frequent_flier_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (f FrequentFlierAccountAggregate) String() string {
Status: %s
(First Reason: %s)
(First AggregateType %s)
(Pending AggregateEvents: %d)
(AggregateVersion: %d)
(Pending aggregateEvents: %d)
(aggregateVersion: %d)
`
return fmt.Sprintf(format, f.AggregateID, f.miles, f.tierPoints, f.status, reason, aggregateType, len(f.AggregateEvents), f.AggregateVersion)
}
12 changes: 6 additions & 6 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ type eventStore interface {

// snapshotStore interface expose the methods an snapshot store must uphold
type snapshotStore interface {
Get(id string, a interface{}) error
Save(id string, a interface{}) error
Get(id string, a snapshotstore.Snapshot) error
Save(a snapshotstore.Snapshot) error
}

// aggregate interface to use the aggregate root specific methods
type aggregate interface {
id() string
ID() string
path() string
BuildFromHistory(a aggregate, events []Event)
Transition(event Event)
changes() []Event
updateVersion()
version() Version
Version() Version
}

// Repository is the returned instance from the factory function
Expand Down Expand Up @@ -69,7 +69,7 @@ func (r *Repository) SaveSnapshot(aggregate aggregate) error {
if len(aggregate.changes()) > 0 {
return errors.New("can't save snapshot with unsaved events")
}
err := r.snapshotStore.Save(aggregate.id(), aggregate)
err := r.snapshotStore.Save(aggregate)
if err != nil {
return err
}
Expand All @@ -93,7 +93,7 @@ func (r *Repository) Get(id string, aggregate aggregate) error {
}

// fetch events after the current version of the aggregate that could be fetched from the snapshot store
events, err := r.eventStore.Get(id, aggregateType, aggregate.version())
events, err := r.eventStore.Get(id, aggregateType, aggregate.Version())
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func TestSaveAndGetAggregate(t *testing.T) {
t.Fatal("could not save aggregate")
}
twin := Person{}
err = repo.Get(string(person.AggregateID), &twin)
err = repo.Get(person.ID(), &twin)
if err != nil {
t.Fatal("could not get aggregate")
}

// Check internal aggregate version
if person.AggregateVersion != twin.AggregateVersion {
t.Fatalf("Wrong version org %q copy %q", person.AggregateVersion, twin.AggregateVersion)
if person.Version() != twin.Version() {
t.Fatalf("Wrong version org %q copy %q", person.Version(), twin.Version())
}

// Check person Name
Expand Down Expand Up @@ -61,14 +61,14 @@ func TestSaveAndGetAggregateSnapshotAndEvents(t *testing.T) {
person.GrowOlder()
repo.Save(person)
twin := Person{}
err = repo.Get(string(person.AggregateID), &twin)
err = repo.Get(string(person.ID()), &twin)
if err != nil {
t.Fatal("could not get aggregate")
}

// Check internal aggregate version
if person.AggregateVersion != twin.AggregateVersion {
t.Fatalf("Wrong version org %q copy %q", person.AggregateVersion, twin.AggregateVersion)
if person.Version() != twin.Version() {
t.Fatalf("Wrong version org %q copy %q", person.Version(), twin.Version())
}

// Check person Name
Expand Down
2 changes: 1 addition & 1 deletion serializer/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSerializeDeserialize(t *testing.T) {
}

if event.Version != 1 {
t.Fatalf("wrong value in AggregateVersion")
t.Fatalf("wrong value in aggregateVersion")
}

if event.Timestamp != timestamp {
Expand Down
Loading

0 comments on commit 2aaf914

Please sign in to comment.