Skip to content

Commit

Permalink
Remove extra Subscribe metod on eventstream. Close not unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgren committed Apr 16, 2022
1 parent 40b0176 commit fea90b2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 104 deletions.
102 changes: 45 additions & 57 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,17 @@ type EventStream struct {
names map[string][]*subscription
}

// subscription holding the subscribe / unsubscribe / and func to be called when
// subscription holds the event function to be triggered when an event is triggering the subscription,
// it also hols a close function to end the subscription.
// event matches the subscription
type subscription struct {
eventF func(e Event)
unsubF func()
subF func()
close func()
}

// Unsubscribe stops the subscription
func (s *subscription) Unsubscribe() {
s.unsubF()
}

// Subscribe starts the subscription
func (s *subscription) Subscribe() {
s.subF()
// Close stops the subscription
func (s *subscription) Close() {
s.close()
}

// NewEventStream factory function
Expand Down Expand Up @@ -121,7 +116,7 @@ func (e *EventStream) All(f func(e Event)) *subscription {
s := subscription{
eventF: f,
}
s.unsubF = func() {
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()

Expand All @@ -132,12 +127,10 @@ func (e *EventStream) All(f func(e Event)) *subscription {
}
}
}
s.subF = func() {
e.lock.Lock()
defer e.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()
e.all = append(e.all, &s)

e.all = append(e.all, &s)
}
return &s
}

Expand All @@ -146,7 +139,7 @@ func (e *EventStream) Aggregate(f func(e Event), aggregates ...Aggregate) *subsc
s := subscription{
eventF: f,
}
s.unsubF = func() {
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()

Expand All @@ -162,19 +155,18 @@ func (e *EventStream) Aggregate(f func(e Event), aggregates ...Aggregate) *subsc
}
}
}
s.subF = func() {
e.lock.Lock()
defer e.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()

for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s_%s", root.path(), name, root.ID())
for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s_%s", root.path(), name, root.ID())

// adds one more function to the aggregate
e.specificAggregates[ref] = append(e.specificAggregates[ref], &s)
}
// adds one more function to the aggregate
e.specificAggregates[ref] = append(e.specificAggregates[ref], &s)
}

return &s
}

Expand All @@ -183,7 +175,7 @@ func (e *EventStream) AggregateType(f func(e Event), aggregates ...Aggregate) *s
s := subscription{
eventF: f,
}
s.unsubF = func() {
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()

Expand All @@ -199,19 +191,18 @@ func (e *EventStream) AggregateType(f func(e Event), aggregates ...Aggregate) *s
}
}
}
s.subF = func() {
e.lock.Lock()
defer e.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()

for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s", root.path(), name)
for _, a := range aggregates {
name := reflect.TypeOf(a).Elem().Name()
root := a.Root()
ref := fmt.Sprintf("%s_%s", root.path(), name)

// adds one more function to the aggregate
e.aggregateTypes[ref] = append(e.aggregateTypes[ref], &s)
}
// adds one more function to the aggregate
e.aggregateTypes[ref] = append(e.aggregateTypes[ref], &s)
}

return &s
}

Expand All @@ -220,7 +211,7 @@ func (e *EventStream) SpecificEvent(f func(e Event), events ...interface{}) *sub
s := subscription{
eventF: f,
}
s.unsubF = func() {
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()

Expand All @@ -234,17 +225,15 @@ func (e *EventStream) SpecificEvent(f func(e Event), events ...interface{}) *sub
}
}
}
s.subF = func() {
e.lock.Lock()
defer e.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()

// subscribe to specified events
for _, event := range events {
t := reflect.TypeOf(event)
// adds one more property to the event type
e.specificEvents[t] = append(e.specificEvents[t], &s)
}
for _, event := range events {
t := reflect.TypeOf(event)
// adds one more property to the event type
e.specificEvents[t] = append(e.specificEvents[t], &s)
}

return &s
}

Expand All @@ -253,7 +242,7 @@ func (e *EventStream) Name(f func(e Event), aggregate string, events ...string)
s := subscription{
eventF: f,
}
s.unsubF = func() {
s.close = func() {
e.lock.Lock()
defer e.lock.Unlock()

Expand All @@ -267,14 +256,13 @@ func (e *EventStream) Name(f func(e Event), aggregate string, events ...string)
}
}
}
s.subF = func() {
e.lock.Lock()
defer e.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()

for _, event := range events {
ref := aggregate+"_"+event
e.names[ref] = append(e.names[ref], &s)
}
for _, event := range events {
ref := aggregate+"_"+event
e.names[ref] = append(e.names[ref], &s)
}

return &s
}
54 changes: 19 additions & 35 deletions eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestSubAll(t *testing.T) {
streamEvent = &e
}
s := e.All(f)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

if streamEvent == nil {
Expand All @@ -54,8 +53,7 @@ func TestSubSpecificEvent(t *testing.T) {
streamEvent = &e
}
s := e.SpecificEvent(f, &AnEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

if streamEvent == nil {
Expand All @@ -81,8 +79,7 @@ func TestSubAggregate(t *testing.T) {
streamEvent = &e
}
s := e.Aggregate(f, &anAggregate, &anOtherAggregate)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
// update with event from the AnAggregate aggregate
e.Publish(anAggregate.AggregateRoot, []eventsourcing.Event{event})
if streamEvent == nil {
Expand All @@ -106,8 +103,7 @@ func TestSubAggregateType(t *testing.T) {
streamEvent = &e
}
s := e.AggregateType(f, &AnAggregate{}, &AnotherAggregate{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

// update with event from the AnAggregate aggregate
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})
Expand All @@ -132,8 +128,7 @@ func TestSubSpecificEventMultiplePublish(t *testing.T) {
streamEvents = append(streamEvents, &e)
}
s := e.SpecificEvent(f, &AnEvent{}, &AnotherEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})
e.Publish(AnotherAggregate{}.AggregateRoot, []eventsourcing.Event{otherEvent})

Expand Down Expand Up @@ -172,8 +167,7 @@ func TestUpdateNoneSubscribedEvent(t *testing.T) {
streamEvent = &e
}
s := e.SpecificEvent(f, &AnotherEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

if streamEvent != nil {
Expand Down Expand Up @@ -205,20 +199,15 @@ func TestManySubscribers(t *testing.T) {
streamEvent5 = append(streamEvent5, e)
}
s := e.SpecificEvent(f1, &AnotherEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
s = e.SpecificEvent(f2, &AnotherEvent{}, &AnEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
s = e.SpecificEvent(f3, &AnEvent{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
s = e.All(f4)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
s = e.AggregateType(f5, &AnAggregate{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

Expand Down Expand Up @@ -258,11 +247,11 @@ func TestParallelPublish(t *testing.T) {
streamEvent = append(streamEvent, e)
}
s := e.SpecificEvent(f1, &AnEvent{})
defer s.Unsubscribe()
defer s.Close()
s = e.SpecificEvent(f2, &AnotherEvent{})
defer s.Unsubscribe()
defer s.Close()
s = e.All(f3)
defer s.Unsubscribe()
defer s.Close()

wg := sync.WaitGroup{}
// concurrently update the event stream
Expand Down Expand Up @@ -300,24 +289,20 @@ func TestClose(t *testing.T) {
count++
}
s1 := e.All(f)
s1.Subscribe()
s2 := e.SpecificEvent(f, &AnEvent{})
s2.Subscribe()
s3 := e.AggregateType(f, &AnAggregate{})
s3.Subscribe()
s4 := e.Aggregate(f, &AnAggregate{})
s4.Subscribe()

// trigger 4 subscriptions
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})
if count != 4 {
t.Fatalf("should have received four event")
}
// close all subscriptions
s1.Unsubscribe()
s2.Unsubscribe()
s3.Unsubscribe()
s4.Unsubscribe()
s1.Close()
s2.Close()
s3.Close()
s4.Close()

// new event should not trigger closed subscriptions
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})
Expand All @@ -335,8 +320,7 @@ func TestName(t *testing.T) {
streamEvent = e
}
s := e.Name(f, "AnAggregate", "AnEvent")
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()
e.Publish(AnAggregate{}.AggregateRoot, []eventsourcing.Event{event})

if streamEvent.Version != event.Version {
Expand Down
18 changes: 6 additions & 12 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ func TestSubscriptionAllEvent(t *testing.T) {
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.All(f)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

person, err := CreatePerson("kalle")
if err != nil {
Expand All @@ -253,8 +252,7 @@ func TestSubscriptionSpecificEvent(t *testing.T) {
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.SpecificEvent(f, &Born{}, &AgedOneYear{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

person, err := CreatePerson("kalle")
if err != nil {
Expand All @@ -280,8 +278,7 @@ func TestSubscriptionAggregateType(t *testing.T) {
}
repo := eventsourcing.NewRepository(memory.Create(), nil)
s := repo.EventStream.AggregateType(f, &Person{})
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

person, err := CreatePerson("kalle")
if err != nil {
Expand Down Expand Up @@ -312,8 +309,7 @@ func TestSubscriptionSpecificAggregate(t *testing.T) {
t.Fatal(err)
}
s := repo.EventStream.Aggregate(f, person)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

person.GrowOlder()
person.GrowOlder()
Expand Down Expand Up @@ -360,8 +356,7 @@ func TestEventChainDoesNotHang(t *testing.T) {
t.Fatal(err)
}
s := repo.EventStream.Aggregate(f, person)
s.Subscribe()
defer s.Unsubscribe()
defer s.Close()

// subscribe to all events and filter out AgedOneYear
ageCounter := 0
Expand All @@ -372,8 +367,7 @@ func TestEventChainDoesNotHang(t *testing.T) {
ageCounter++
}
})
s2.Subscribe()
defer s2.Unsubscribe()
defer s2.Close()

person.GrowOlder()
person.GrowOlder()
Expand Down

0 comments on commit fea90b2

Please sign in to comment.