Skip to content

Commit

Permalink
none working bbolt with iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgrenx committed Jan 14, 2022
1 parent cdc72b2 commit 67940b6
Showing 1 changed file with 49 additions and 34 deletions.
83 changes: 49 additions & 34 deletions eventstore/bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,56 +166,71 @@ func (e *BBolt) Save(events []eventsourcing.Event) error {
return tx.Commit()
}

type iterator struct {
tx *bbolt.Tx
cursor *bbolt.Cursor
serializer eventsourcing.Serializer
}

func (i *iterator) Close() {
i.tx.Rollback()
}

func (i *iterator) Next() (eventsourcing.Event, error) {
k, obj := i.cursor.Next()
if k == nil {
return eventsourcing.Event{}, eventsourcing.ErrNoMoreEvents
}
bEvent := boltEvent{}
err := i.serializer.Unmarshal(obj, &bEvent)
if err != nil {
return eventsourcing.Event{}, errors.New(fmt.Sprintf("could not deserialize event, %v", err))
}
f, ok := i.serializer.Type(bEvent.AggregateType, bEvent.Reason)
if !ok {
// if the typ/reason is not register jump over the event
return i.Next()
}
eventData := f()
err = i.serializer.Unmarshal(bEvent.Data, &eventData)
if err != nil {
return eventsourcing.Event{}, errors.New(fmt.Sprintf("could not deserialize event data, %v", err))
}
event := eventsourcing.Event{
AggregateID: bEvent.AggregateID,
AggregateType: bEvent.AggregateType,
Version: eventsourcing.Version(bEvent.Version),
GlobalVersion: eventsourcing.Version(bEvent.GlobalVersion),
Timestamp: bEvent.Timestamp,
Metadata: bEvent.Metadata,
Data: eventData,
}
return event, nil
}

// Get aggregate events
func (e *BBolt) Get(id string, aggregateType string, afterVersion eventsourcing.Version) ([]eventsourcing.Event, error) {
func (e *BBolt) Get(id string, aggregateType string, afterVersion eventsourcing.Version) (eventsourcing.EventIterator, error) {
bucketName := aggregateKey(aggregateType, id)

tx, err := e.db.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()

evBucket := tx.Bucket([]byte(bucketName))
if evBucket == nil {
return nil, eventsourcing.ErrNoEvents
}

cursor := evBucket.Cursor()
events := make([]eventsourcing.Event, 0)
firstEvent := afterVersion + 1

for k, obj := cursor.Seek(itob(uint64(firstEvent))); k != nil; k, obj = cursor.Next() {
bEvent := boltEvent{}
err := e.serializer.Unmarshal(obj, &bEvent)
if err != nil {
return nil, errors.New(fmt.Sprintf("could not deserialize event, %v", err))
}
f, ok := e.serializer.Type(bEvent.AggregateType, bEvent.Reason)
if !ok {
// if the typ/reason is not register jump over the event
continue
}
eventData := f()
err = e.serializer.Unmarshal(bEvent.Data, &eventData)
if err != nil {
return nil, errors.New(fmt.Sprintf("could not deserialize event data, %v", err))
}
event := eventsourcing.Event{
AggregateID: bEvent.AggregateID,
AggregateType: bEvent.AggregateType,
Version: eventsourcing.Version(bEvent.Version),
GlobalVersion: eventsourcing.Version(bEvent.GlobalVersion),
Timestamp: bEvent.Timestamp,
Metadata: bEvent.Metadata,
Data: eventData,
}
events = append(events, event)
k, _ := cursor.Seek(itob(uint64(firstEvent)))
if k == nil {
return nil, errors.New("No events")
}
if len(events) == 0 {
return nil, eventsourcing.ErrNoEvents
}
return events, nil
i := iterator{tx: tx, cursor: cursor, serializer: e.serializer}
return &i, nil

}

// GlobalEvents return count events in order globaly from the start posistion
Expand Down

0 comments on commit 67940b6

Please sign in to comment.