Skip to content

Commit

Permalink
id → aggregate_id (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
peschkaj authored Aug 20, 2022
1 parent ce25fe3 commit 014c4db
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 10 deletions.
6 changes: 3 additions & 3 deletions eventstore/sql/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package sql

import "context"

const createTable = `CREATE TABLE events (seq INTEGER PRIMARY KEY AUTOINCREMENT, id UUID NOT NULL, version INTEGER, reason VARCHAR, type VARCHAR, timestamp VARCHAR, data BLOB, metadata BLOB);`
const createTable = `CREATE TABLE events (seq INTEGER PRIMARY KEY AUTOINCREMENT, aggregate_id UUID NOT NULL, version INTEGER, reason VARCHAR, type VARCHAR, timestamp VARCHAR, data BLOB, metadata BLOB);`

// Migrate the database
func (s *SQL) Migrate() error {
sqlStmt := []string{
createTable,
`CREATE UNIQUE INDEX id_type_version ON events(id, type, version);`,
`CREATE INDEX id_type ON events (id, type);`,
`CREATE UNIQUE INDEX id_type_version ON events(aggregate_id, type, version);`,
`CREATE INDEX id_type ON events (aggregate_id, type);`,
}
return s.migrate(sqlStmt)
}
Expand Down
8 changes: 4 additions & 4 deletions eventstore/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *SQL) Save(events []eventsourcing.Event) error {

var currentVersion eventsourcing.Version
var version int
selectStm := `SELECT version FROM events WHERE id=? AND type=? ORDER BY version DESC LIMIT 1`
selectStm := `SELECT version FROM events WHERE aggregate_id=? AND type=? ORDER BY version DESC LIMIT 1`
err = tx.QueryRow(selectStm, aggregateID, aggregateType).Scan(&version)
if err != nil && err != sql.ErrNoRows {
return err
Expand All @@ -67,7 +67,7 @@ func (s *SQL) Save(events []eventsourcing.Event) error {
}

var lastInsertedID int64
insert := `INSERT INTO events (id, version, reason, type, timestamp, data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)`
insert := `INSERT INTO events (aggregate_id, version, reason, type, timestamp, data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)`
for i, event := range events {
var e, m []byte

Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *SQL) Save(events []eventsourcing.Event) error {

// Get the events from database
func (s *SQL) Get(ctx context.Context, id uuid.UUID, aggregateType string, afterVersion eventsourcing.Version) (eventsourcing.EventIterator, error) {
selectStm := `SELECT seq, id, version, reason, type, timestamp, data, metadata FROM events WHERE id = ? AND type = ? AND version > ? ORDER BY version ASC`
selectStm := `SELECT seq, aggregate_id, version, reason, type, timestamp, data, metadata FROM events WHERE aggregate_id = ? AND type = ? AND version > ? ORDER BY version ASC`
rows, err := s.db.QueryContext(ctx, selectStm, id, aggregateType, afterVersion)
if err != nil {
return nil, err
Expand All @@ -110,7 +110,7 @@ func (s *SQL) Get(ctx context.Context, id uuid.UUID, aggregateType string, after

// GlobalEvents return count events in order globaly from the start posistion
func (s *SQL) GlobalEvents(start, count uint64) ([]eventsourcing.Event, error) {
selectStm := `SELECT seq, id, version, reason, type, timestamp, data, metadata FROM events WHERE seq >= ? ORDER BY seq ASC LIMIT ?`
selectStm := `SELECT seq, aggregate_id, version, reason, type, timestamp, data, metadata FROM events WHERE seq >= ? ORDER BY seq ASC LIMIT ?`
rows, err := s.db.Query(selectStm, start, count)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/go-gorp/gorp v2.0.0+incompatible h1:dIQPsBtl6/H1MjVseWuWPXa7ET4p6Dve4j3Hg+UjqYw=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
4 changes: 2 additions & 2 deletions snapshotstore/sql/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package sql

import "context"

const createTable = `CREATE TABLE snapshots (id UUID NOT NULL, type VARCHAR, version INTEGER, global_version INTEGER, state BLOB);`
const createTable = `CREATE TABLE snapshots (aggregate_id UUID NOT NULL, type VARCHAR, version INTEGER, global_version INTEGER, state BLOB);`

// Migrate the database
func (s *SQL) Migrate() error {
sqlStmt := []string{
createTable,
`CREATE UNIQUE INDEX id_type ON snapshots (id, type);`,
`CREATE UNIQUE INDEX id_type ON snapshots (aggregate_id, type);`,
}
return s.migrate(sqlStmt)
}
Expand Down
2 changes: 1 addition & 1 deletion snapshotstore/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *SQL) Get(ctx context.Context, id uuid.UUID, typ string) (eventsourcing.
}
defer tx.Rollback()

statement := `SELECT state, version, global_version FROM snapshots WHERE id=$1 AND type=$2 LIMIT 1`
statement := `SELECT state, version, global_version FROM snapshots WHERE aggregate_id=$1 AND type=$2 LIMIT 1`
var state []byte
var version uint64
var globalVersion uint64
Expand Down

0 comments on commit 014c4db

Please sign in to comment.