Skip to content

Commit

Permalink
rewrite the readme (Subscribe)
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgren committed Nov 24, 2019
1 parent 9ba23d1 commit ed989ad
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,41 +207,36 @@ err := j.Register(&Person{}, &Born{}, &AgedOneYear{})

The unsafe serializer stores the underlying memory representation of a struct directly. This makes it as its name implies, unsafe to use if you are unsure what you are doing. [Here](https://youtu.be/4xB46Xl9O9Q?t=610) is the video that explains the reason for this serializer.

### Event Stream
### Event Subscription

The repository expose the `EventStream(events ...interface{}) observer.Stream` function that makes it possible to subscribe on saved events.
The returned stream will collect all events that has occurred after the call to the function and its possible to iterate over them via the functions from the [go-observer](https://github.com/imkira/go-observer) pkg (that is the pkg we use to accomplish this).
The repository expose the `Subscribe(func(e eventsourcing.Event), events ...interface{})` function that makes it possible to subscribe on saved events in realtime.
The function `func(e eventcourcing.Event)` will be called when and subscribed to event is saved in the repository.

If no events are included in the input `repo.EventStream()` all saved events will be present in the stream.
When events `repo.EventStream(&FrequentFlierAccountCreated{}, &StatusMatched{})` only those events with that type will be present in the stream.
If no events are included in the input `repo.Subscribe(func(e eventsourcing.Event))` all saved events will be exposed via the `func(e eventsourcing.Event)` function.
When events `repo.Subscribe(func(e eventsourcing.Event),&FrequentFlierAccountCreated{}, &StatusMatched{})` only those events with that type will be exposed via the function.
There is no restrictions that the events need to come from the same aggregate type, you can mix and match as you please.

The stream is realtime and events that are saved before the call to the EventStream function will not be present on the stream. If the application
depends on this functionality make sure to call the function before any events are saved.
The subscription is realtime and events that are saved before the call to the Subscribe function will not be exposed via the `func(e eventsourcing.Event)` function. If the application
depends on this functionality make sure to call the `repo.Subscribe` function before any events are saved.

The event stream enables the application to make use of the reactive patterns and to make it more decoupled. Check out the [Reactive Manifesto](https://www.reactivemanifesto.org/)
The event subscription enables the application to make use of the reactive patterns and to make it more decoupled. Check out the [Reactive Manifesto](https://www.reactivemanifesto.org/)
for more detailed information.

Example on how to setup the event stream and consume the event `FrequentFlierAccountCreated`
Example on how to setup the event subscription and consume the event `FrequentFlierAccountCreated`

```go
// Setup a memory based repository
repo := eventsourcing.NewRepository(memory.Create(unsafe.New()), nil)
stream := repo.EventStream()

// Read the event stream async
go func() {
for {
// wait for an event to be accessible on the stream
event := stream.WaitNext().(eventsourcing.Event)
// use similar switch as in the Transition function from the aggregate
switch e := event.Data.(type) {
case *FrequentFlierAccountCreated:
// e now have type info
fmt.Println(e)
}
}
}()
f := func(e eventsourcing.Event) {
switch e := event.Data.(type) {
case *FrequentFlierAccountCreated:
// e now have type info
fmt.Println(e)
}
}

// subscribe to all events
repo.Subscribe(f)
```

## Custom made components
Expand Down Expand Up @@ -289,4 +284,4 @@ type snapshotSerializer interface {
SerializeSnapshot(interface{}) ([]byte, error)
DeserializeSnapshot(data []byte, a interface{}) error
}
```
```

0 comments on commit ed989ad

Please sign in to comment.