Skip to content

Commit

Permalink
event subscription per aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgren committed Feb 2, 2020
1 parent 02eb27c commit cc662e7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
31 changes: 26 additions & 5 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (

// EventStream struct what handles event subscription
type EventStream struct {
specificEvents map[reflect.Type][]func(e Event)
allEvents []func(e Event)
publishLock sync.Mutex
aggregateEvents map[string][]func(e Event)
specificEvents map[reflect.Type][]func(e Event)
allEvents []func(e Event)
publishLock sync.Mutex
}

// NewEventStream factory function
func NewEventStream() *EventStream {
return &EventStream{
specificEvents: make(map[reflect.Type][]func(e Event)),
allEvents: []func(e Event){},
aggregateEvents: make(map[string][]func(e Event)),
specificEvents: make(map[reflect.Type][]func(e Event)),
allEvents: []func(e Event){},
}
}

Expand All @@ -33,6 +35,13 @@ func (e *EventStream) Update(events []Event) {
}
}

// call all functions that has registered for the aggregate events
if functions, ok := e.aggregateEvents[event.AggregateType]; ok {
for _, f := range functions {
f(event)
}
}

// call all functions that has registered for all events
for _, f := range e.allEvents {
f(event)
Expand All @@ -46,6 +55,18 @@ func (e *EventStream) SubscribeAll(f func(e Event)) {
e.allEvents = append(e.allEvents, f)
}

// SubscribeAggregate bind the f function to be called on events on the aggregate type
func (e *EventStream) SubscribeAggregate(f func(e Event), a aggregate) {
aggregateType := reflect.TypeOf(a).Elem().Name()
if e.aggregateEvents[aggregateType] == nil {
// add the event type and prop to the empty register key
e.aggregateEvents[aggregateType] = []func(e Event){f}
} else {
// adds one more property to the event type
e.aggregateEvents[aggregateType] = append(e.aggregateEvents[aggregateType], f)
}
}

// SubscribeSpecific bind the f function to be called on specific events
func (e *EventStream) SubscribeSpecific(f func(e Event), events ...interface{}) {
// subscribe to specified events
Expand Down
39 changes: 37 additions & 2 deletions eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ import (
"github.com/hallgren/eventsourcing"
)

type AnAggregate struct {
eventsourcing.AggregateRoot
}

func (a *AnAggregate) Transition(e eventsourcing.Event) {
}

type AnEvent struct {
Name string
}

type AnotherEvent struct{}

var event = eventsourcing.Event{Version: 123, Data: &AnEvent{Name: "123"}, Reason: "AnEvent"}
var otherEvent = eventsourcing.Event{Version: 123, Data: &AnotherEvent{}, Reason: "AnotherEvent"}
var event = eventsourcing.Event{Version: 123, Data: &AnEvent{Name: "123"}, Reason: "AnEvent", AggregateType: "AnAggregate"}
var otherEvent = eventsourcing.Event{Version: 123, Data: &AnotherEvent{}, Reason: "AnotherEvent", AggregateType: "AnotherAggregate"}

func TestAll(t *testing.T) {
var streamEvent *eventsourcing.Event
Expand Down Expand Up @@ -51,6 +58,24 @@ func TestSpecific(t *testing.T) {
}
}

func TestSubscribeAggregate(t *testing.T) {
var streamEvent *eventsourcing.Event
e := eventsourcing.NewEventStream()
f := func(e eventsourcing.Event) {
streamEvent = &e
}
e.SubscribeAggregate(f, &AnAggregate{})
e.Update([]eventsourcing.Event{event})

if streamEvent == nil {
t.Fatalf("should have received event")
}

if streamEvent.Version != event.Version {
t.Fatalf("wrong info in event got %q expected %q", streamEvent.Version, event.Version)
}
}

func TestManySpecific(t *testing.T) {
var streamEvents []*eventsourcing.Event
e := eventsourcing.NewEventStream()
Expand Down Expand Up @@ -100,6 +125,7 @@ func TestManySubscribers(t *testing.T) {
streamEvent2 := make([]eventsourcing.Event, 0)
streamEvent3 := make([]eventsourcing.Event, 0)
streamEvent4 := make([]eventsourcing.Event, 0)
streamEvent5 := make([]eventsourcing.Event, 0)

e := eventsourcing.NewEventStream()
f1 := func(e eventsourcing.Event) {
Expand All @@ -114,10 +140,15 @@ func TestManySubscribers(t *testing.T) {
f4 := func(e eventsourcing.Event) {
streamEvent4 = append(streamEvent4, e)
}
f5 := func(e eventsourcing.Event) {
streamEvent5 = append(streamEvent5, e)
}
e.SubscribeSpecific(f1, &AnotherEvent{})
e.SubscribeSpecific(f2, &AnotherEvent{}, &AnEvent{})
e.SubscribeSpecific(f3, &AnEvent{})
e.SubscribeAll(f4)
e.SubscribeAggregate(f5, &AnAggregate{})


e.Update([]eventsourcing.Event{event})

Expand All @@ -136,6 +167,10 @@ func TestManySubscribers(t *testing.T) {
if len(streamEvent4) != 1 {
t.Fatalf("stream4 should have one event")
}

if len(streamEvent5) != 1 {
t.Fatalf("stream5 should have one event")
}
}

func TestParallelUpdates(t *testing.T) {
Expand Down

0 comments on commit cc662e7

Please sign in to comment.