diff --git a/eventstore/bbolt/bbolt.go b/eventstore/bbolt/bbolt.go index 06e2f515..19fc813d 100644 --- a/eventstore/bbolt/bbolt.go +++ b/eventstore/bbolt/bbolt.go @@ -166,15 +166,56 @@ func (e *BBolt) Save(events []eventsourcing.Event) error { return tx.Commit() } +type iterator struct { + tx *bbolt.Tx + cursor *bbolt.Cursor + serializer eventsourcing.Serializer +} + +func (i *iterator) Close() { + i.tx.Rollback() +} + +func (i *iterator) Next() (eventsourcing.Event, error) { + k, obj := i.cursor.Next() + if k == nil { + return eventsourcing.Event{}, eventsourcing.ErrNoMoreEvents + } + bEvent := boltEvent{} + err := i.serializer.Unmarshal(obj, &bEvent) + if err != nil { + return eventsourcing.Event{}, errors.New(fmt.Sprintf("could not deserialize event, %v", err)) + } + f, ok := i.serializer.Type(bEvent.AggregateType, bEvent.Reason) + if !ok { + // if the typ/reason is not register jump over the event + return i.Next() + } + eventData := f() + err = i.serializer.Unmarshal(bEvent.Data, &eventData) + if err != nil { + return eventsourcing.Event{}, errors.New(fmt.Sprintf("could not deserialize event data, %v", err)) + } + event := eventsourcing.Event{ + AggregateID: bEvent.AggregateID, + AggregateType: bEvent.AggregateType, + Version: eventsourcing.Version(bEvent.Version), + GlobalVersion: eventsourcing.Version(bEvent.GlobalVersion), + Timestamp: bEvent.Timestamp, + Metadata: bEvent.Metadata, + Data: eventData, + } + return event, nil +} + // Get aggregate events -func (e *BBolt) Get(id string, aggregateType string, afterVersion eventsourcing.Version) ([]eventsourcing.Event, error) { +func (e *BBolt) Get(id string, aggregateType string, afterVersion eventsourcing.Version) (eventsourcing.EventIterator, error) { bucketName := aggregateKey(aggregateType, id) tx, err := e.db.Begin(false) if err != nil { return nil, err } - defer tx.Rollback() evBucket := tx.Bucket([]byte(bucketName)) if evBucket == nil { @@ -182,40 +223,14 @@ func (e *BBolt) Get(id string, aggregateType string, afterVersion eventsourcing. } cursor := evBucket.Cursor() - events := make([]eventsourcing.Event, 0) firstEvent := afterVersion + 1 - - for k, obj := cursor.Seek(itob(uint64(firstEvent))); k != nil; k, obj = cursor.Next() { - bEvent := boltEvent{} - err := e.serializer.Unmarshal(obj, &bEvent) - if err != nil { - return nil, errors.New(fmt.Sprintf("could not deserialize event, %v", err)) - } - f, ok := e.serializer.Type(bEvent.AggregateType, bEvent.Reason) - if !ok { - // if the typ/reason is not register jump over the event - continue - } - eventData := f() - err = e.serializer.Unmarshal(bEvent.Data, &eventData) - if err != nil { - return nil, errors.New(fmt.Sprintf("could not deserialize event data, %v", err)) - } - event := eventsourcing.Event{ - AggregateID: bEvent.AggregateID, - AggregateType: bEvent.AggregateType, - Version: eventsourcing.Version(bEvent.Version), - GlobalVersion: eventsourcing.Version(bEvent.GlobalVersion), - Timestamp: bEvent.Timestamp, - Metadata: bEvent.Metadata, - Data: eventData, - } - events = append(events, event) + k, _ := cursor.Seek(itob(uint64(firstEvent))) + if k == nil { + return nil, errors.New("No events") } - if len(events) == 0 { - return nil, eventsourcing.ErrNoEvents - } - return events, nil + i := iterator{tx: tx, cursor: cursor, serializer: e.serializer} + return &i, nil + } // GlobalEvents return count events in order globaly from the start posistion