Skip to content

Commit

Permalink
Merge branch 'eventstream_retake' of https://github.com/hallgren/even…
Browse files Browse the repository at this point in the history
…tsourcing into eventstream_retake
  • Loading branch information
hallgren committed Apr 16, 2022
2 parents fea90b2 + 453bd8b commit d50972c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 24 deletions.
8 changes: 4 additions & 4 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (e *EventStream) specificAggregatesPublisher(agg AggregateRoot, event Event

// call functions that has registered for the aggregate type events
func (e *EventStream) aggregateEventNamePublisher(event Event) {
ref := event.AggregateType+"_"+event.Reason()
ref := event.AggregateType + "_" + event.Reason()
if subs, ok := e.names[ref]; ok {
for _, s := range subs {
s.eventF(event)
Expand Down Expand Up @@ -206,8 +206,8 @@ func (e *EventStream) AggregateType(f func(e Event), aggregates ...Aggregate) *s
return &s
}

// SpecificEvent subscribe on specific events where the interface is a pointer to the event struct
func (e *EventStream) SpecificEvent(f func(e Event), events ...interface{}) *subscription {
// Event subscribe on specific events where the interface is a pointer to the event struct
func (e *EventStream) Event(f func(e Event), events ...interface{}) *subscription {
s := subscription{
eventF: f,
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func (e *EventStream) Name(f func(e Event), aggregate string, events ...string)
defer e.lock.Unlock()

for _, event := range events {
ref := aggregate+"_"+event
ref := aggregate + "_" + event
for i, sub := range e.names[ref] {
if &s == sub {
e.names[ref] = append(e.names[ref][:i], e.names[ref][i+1:]...)
Expand Down
23 changes: 14 additions & 9 deletions eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestSubSpecificEvent(t *testing.T) {
f := func(e eventsourcing.Event) {
streamEvent = &e
}
s := e.SpecificEvent(f, &AnEvent{})


s := e.Event(f, &AnEvent{})
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

Expand Down Expand Up @@ -127,7 +129,8 @@ func TestSubSpecificEventMultiplePublish(t *testing.T) {
f := func(e eventsourcing.Event) {
streamEvents = append(streamEvents, &e)
}
s := e.SpecificEvent(f, &AnEvent{}, &AnotherEvent{})

s := e.Event(f, &AnEvent{}, &AnotherEvent{})
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})
e.Publish(AnotherAggregate{}.AggregateRoot, []eventsourcing.Event{otherEvent})
Expand Down Expand Up @@ -166,7 +169,7 @@ func TestUpdateNoneSubscribedEvent(t *testing.T) {
f := func(e eventsourcing.Event) {
streamEvent = &e
}
s := e.SpecificEvent(f, &AnotherEvent{})
s := e.Event(f, &AnotherEvent{})
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

Expand Down Expand Up @@ -198,11 +201,12 @@ func TestManySubscribers(t *testing.T) {
f5 := func(e eventsourcing.Event) {
streamEvent5 = append(streamEvent5, e)
}
s := e.SpecificEvent(f1, &AnotherEvent{})

s := e.Event(f1, &AnotherEvent{})
defer s.Close()
s = e.SpecificEvent(f2, &AnotherEvent{}, &AnEvent{})
s = e.Event(f2, &AnotherEvent{}, &AnEvent{})
defer s.Close()
s = e.SpecificEvent(f3, &AnEvent{})
s = e.Event(f3, &AnEvent{})
defer s.Close()
s = e.All(f4)
defer s.Close()
Expand Down Expand Up @@ -246,9 +250,10 @@ func TestParallelPublish(t *testing.T) {
f3 := func(e eventsourcing.Event) {
streamEvent = append(streamEvent, e)
}
s := e.SpecificEvent(f1, &AnEvent{})

s := e.Event(f1, &AnEvent{})
defer s.Close()
s = e.SpecificEvent(f2, &AnotherEvent{})
s = e.Event(f2, &AnotherEvent{})
defer s.Close()
s = e.All(f3)
defer s.Close()
Expand Down Expand Up @@ -289,7 +294,7 @@ func TestClose(t *testing.T) {
count++
}
s1 := e.All(f)
s2 := e.SpecificEvent(f, &AnEvent{})
s2 := e.Event(f, &AnEvent{})
s3 := e.AggregateType(f, &AnAggregate{})
s4 := e.Aggregate(f, &AnAggregate{})

Expand Down
23 changes: 18 additions & 5 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type Aggregate interface {
Transition(event Event)
}

type EventSubscribers interface {
All(f func(e Event)) *subscription
Aggregate(f func(e Event), aggregates ...Aggregate) *subscription
AggregateType(f func(e Event), aggregates ...Aggregate) *subscription
Event(f func(e Event), events ...interface{}) *subscription
Name(f func(e Event), aggregate string, events ...string) *subscription
}

// ErrSnapshotNotFound returns if snapshot not found
var ErrSnapshotNotFound = errors.New("snapshot not found")

Expand All @@ -38,20 +46,25 @@ var ErrAggregateNotFound = errors.New("aggregate not found")

// Repository is the returned instance from the factory function
type Repository struct {
EventStream *EventStream
eventStore EventStore
snapshot *SnapshotHandler
eventStream *EventStream
eventStore EventStore
snapshot *SnapshotHandler
}

// NewRepository factory function
func NewRepository(eventStore EventStore, snapshot *SnapshotHandler) *Repository {
return &Repository{
eventStore: eventStore,
snapshot: snapshot,
EventStream: NewEventStream(),
eventStream: NewEventStream(),
}
}

// Subscribers returns an interface with all event subscribers
func (r *Repository) Subscribers() EventSubscribers {
return r.eventStream
}

// Save an aggregates events
func (r *Repository) Save(aggregate Aggregate) error {
root := aggregate.Root()
Expand All @@ -61,7 +74,7 @@ func (r *Repository) Save(aggregate Aggregate) error {
return err
}
// publish the saved events to subscribers
r.EventStream.Publish(*root, root.Events())
r.eventStream.Publish(*root, root.Events())

// update the internal aggregate state
root.update()
Expand Down
12 changes: 6 additions & 6 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestSubscriptionAllEvent(t *testing.T) {
counter++
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.All(f)
s := repo.Subscribers().All(f)
defer s.Close()

person, err := CreatePerson("kalle")
Expand All @@ -251,7 +251,7 @@ func TestSubscriptionSpecificEvent(t *testing.T) {
counter++
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.SpecificEvent(f, &Born{}, &AgedOneYear{})
s := repo.Subscribers().Event(f, &Born{}, &AgedOneYear{})
defer s.Close()

person, err := CreatePerson("kalle")
Expand All @@ -277,7 +277,7 @@ func TestSubscriptionAggregateType(t *testing.T) {
counter++
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.AggregateType(f, &Person{})
s := repo.Subscribers().AggregateType(f, &Person{})
defer s.Close()

person, err := CreatePerson("kalle")
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestSubscriptionSpecificAggregate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s := repo.EventStream.Aggregate(f, person)
s := repo.Subscribers().Aggregate(f, person)
defer s.Close()

person.GrowOlder()
Expand Down Expand Up @@ -355,12 +355,12 @@ func TestEventChainDoesNotHang(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s := repo.EventStream.Aggregate(f, person)
s := repo.Subscribers().Aggregate(f, person)
defer s.Close()

// subscribe to all events and filter out AgedOneYear
ageCounter := 0
s2 := repo.EventStream.All(func(e eventsourcing.Event) {
s2 := repo.Subscribers().All(func(e eventsourcing.Event) {
switch e.Data.(type) {
case *AgedOneYear:
// will match three times on the initial person and one each on the resulting AgedOneYear event
Expand Down

0 comments on commit d50972c

Please sign in to comment.