Skip to content

Commit

Permalink
updated random_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ganigeorgiev committed Dec 12, 2022
1 parent 55b439c commit 0eeae9d
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 284 deletions.
287 changes: 91 additions & 196 deletions daos/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,96 +362,128 @@ func (dao *Dao) DeleteRecord(record *models.Record) error {
// run all consequent DeleteRecord requests synchroniously
// to minimize SQLITE_BUSY errors
if len(refs) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
if err := dao.Block(ctx); err != nil {
return err
// ignore blocking and try to run directly...
} else {
defer dao.Continue()
}
defer dao.Continue()
}

return dao.RunInTransaction(func(txDao *Dao) error {
// always delete the record first to ensure that there will be no "A<->B"
// relations to prevent deadlock when calling DeleteRecord recursively
// manually trigger delete on any linked external auth to ensure
// that the `OnModel*` hooks are triggered.
//
// note: the select is outside of the transaction to minimize
// SQLITE_BUSY errors when mixing read&write in a single transaction
if record.Collection().IsAuth() {
externalAuths, err := dao.FindAllExternalAuthsByRecord(record)
if err != nil {
return err
}
for _, auth := range externalAuths {
if err := txDao.DeleteExternalAuth(auth); err != nil {
return err
}
}
}

// delete the record before the relation references to ensure that there
// will be no "A<->B" relations to prevent deadlock when calling DeleteRecord recursively
if err := txDao.Delete(record); err != nil {
return err
}

// check if related records has to be deleted (if `CascadeDelete` is set)
// OR
// just unset the record id from any relation field values (if they are not required)
uniqueJsonEachAlias := "__je__" + security.PseudorandomString(4)
for refCollection, fields := range refs {
for _, field := range fields {
// fetch all referenced records
return txDao.cascadeRecordDelete(record, refs)
})
}

// cascadeRecordDelete triggers cascade deletion for the provided references
// and split the work to a batched set of go routines.
//
// NB! This method is expected to be called inside a transaction.
func (dao *Dao) cascadeRecordDelete(mainRecord *models.Record, refs map[*models.Collection][]*schema.SchemaField) error {
uniqueJsonEachAlias := "__je__" + security.PseudorandomString(4)

for refCollection, fields := range refs {
for _, field := range fields {
recordTableName := inflector.Columnify(refCollection.Name)
prefixedFieldName := recordTableName + "." + inflector.Columnify(field.Name)
query := dao.RecordQuery(refCollection).
Distinct(true).
LeftJoin(fmt.Sprintf(
// note: the case is used to normalize value access for single and multiple relations.
`json_each(CASE WHEN json_valid([[%s]]) THEN [[%s]] ELSE json_array([[%s]]) END) as {{%s}}`,
prefixedFieldName, prefixedFieldName, prefixedFieldName, uniqueJsonEachAlias,
), nil).
AndWhere(dbx.Not(dbx.HashExp{recordTableName + ".id": mainRecord.Id})).
AndWhere(dbx.HashExp{uniqueJsonEachAlias + ".value": mainRecord.Id})

// trigger cascade for each 1000 rel items until there is none
batchSize := 1000
for {
rows := []dbx.NullStringMap{}
recordTableName := inflector.Columnify(refCollection.Name)
prefixedFieldName := recordTableName + "." + inflector.Columnify(field.Name)
err := txDao.RecordQuery(refCollection).
Distinct(true).
LeftJoin(fmt.Sprintf(
// note: the case is used to normalize value access for single and multiple relations.
`json_each(CASE WHEN json_valid([[%s]]) THEN [[%s]] ELSE json_array([[%s]]) END) as {{%s}}`,
prefixedFieldName, prefixedFieldName, prefixedFieldName, uniqueJsonEachAlias,
), nil).
AndWhere(dbx.Not(dbx.HashExp{recordTableName + ".id": record.Id})).
AndWhere(dbx.HashExp{uniqueJsonEachAlias + ".value": record.Id}).
All(&rows)
if err != nil {
if err := query.Limit(int64(batchSize)).All(&rows); err != nil {
return err
}

total := len(rows)

if total == 0 {
continue
break
}

ch := make(chan error)
perPage := 200
pages := int(math.Ceil(float64(total) / float64(perPage)))

for i := 0; i < pages; i++ {
var chunks []dbx.NullStringMap
if len(rows) <= perPage {
chunks = rows[0:]
rows = nil
} else {
chunks = rows[0:perPage]
rows = rows[perPage:]
batchErr := func() error {
ch := make(chan error)
defer close(ch)

for i := 0; i < pages; i++ {
var chunks []dbx.NullStringMap
if len(rows) <= perPage {
chunks = rows[0:]
rows = nil
} else {
chunks = rows[0:perPage]
rows = rows[perPage:]
}

go func() {
refRecords := models.NewRecordsFromNullStringMaps(refCollection, chunks)
ch <- dao.deleteRefRecords(mainRecord, refRecords, field)
}()
}

go func() {
refRecords := models.NewRecordsFromNullStringMaps(refCollection, chunks)
ch <- txDao.deleteRefRecords(record, refRecords, field)
}()
}

for i := 0; i < pages; i++ {
if err := <-ch; err != nil {
close(ch)
return err
for i := 0; i < pages; i++ {
if err := <-ch; err != nil {
return err
}
}

return nil
}()

if batchErr != nil {
return batchErr
}
close(ch)
}
}

// delete linked external auths
if record.Collection().IsAuth() {
_, err = txDao.DB().Delete((&models.ExternalAuth{}).TableName(), dbx.HashExp{
"collectionId": record.Collection().Id,
"recordId": record.Id,
}).Execute()
if err != nil {
return err
if total < batchSize {
break // no more items
}
}
}
}

return nil
})
return nil
}

// deleteRefRecords checks if related records has to be deleted (if `CascadeDelete` is set)
// OR
// just unset the record id from any relation field values (if they are not required).
//
// NB! This method is expected to be called inside a transaction.
func (dao *Dao) deleteRefRecords(mainRecord *models.Record, refRecords []*models.Record, field *schema.SchemaField) error {
options, _ := field.Options.(*schema.RelationOptions)
if options == nil {
Expand Down Expand Up @@ -492,140 +524,3 @@ func (dao *Dao) deleteRefRecords(mainRecord *models.Record, refRecords []*models

return nil
}

// SyncRecordTableSchema compares the two provided collections
// and applies the necessary related record table changes.
//
// If `oldCollection` is null, then only `newCollection` is used to create the record table.
func (dao *Dao) SyncRecordTableSchema(newCollection *models.Collection, oldCollection *models.Collection) error {
// create
if oldCollection == nil {
cols := map[string]string{
schema.FieldNameId: "TEXT PRIMARY KEY NOT NULL",
schema.FieldNameCreated: "TEXT DEFAULT '' NOT NULL",
schema.FieldNameUpdated: "TEXT DEFAULT '' NOT NULL",
}

if newCollection.IsAuth() {
cols[schema.FieldNameUsername] = "TEXT NOT NULL"
cols[schema.FieldNameEmail] = "TEXT DEFAULT '' NOT NULL"
cols[schema.FieldNameEmailVisibility] = "BOOLEAN DEFAULT FALSE NOT NULL"
cols[schema.FieldNameVerified] = "BOOLEAN DEFAULT FALSE NOT NULL"
cols[schema.FieldNameTokenKey] = "TEXT NOT NULL"
cols[schema.FieldNamePasswordHash] = "TEXT NOT NULL"
cols[schema.FieldNameLastResetSentAt] = "TEXT DEFAULT '' NOT NULL"
cols[schema.FieldNameLastVerificationSentAt] = "TEXT DEFAULT '' NOT NULL"
}

// ensure that the new collection has an id
if !newCollection.HasId() {
newCollection.RefreshId()
newCollection.MarkAsNew()
}

tableName := newCollection.Name

// add schema field definitions
for _, field := range newCollection.Schema.Fields() {
cols[field.Name] = field.ColDefinition()
}

// create table
if _, err := dao.DB().CreateTable(tableName, cols).Execute(); err != nil {
return err
}

// add named index on the base `created` column
if _, err := dao.DB().CreateIndex(tableName, "_"+newCollection.Id+"_created_idx", "created").Execute(); err != nil {
return err
}

// add named unique index on the email and tokenKey columns
if newCollection.IsAuth() {
_, err := dao.DB().NewQuery(fmt.Sprintf(
`
CREATE UNIQUE INDEX _%s_username_idx ON {{%s}} ([[username]]);
CREATE UNIQUE INDEX _%s_email_idx ON {{%s}} ([[email]]) WHERE [[email]] != '';
CREATE UNIQUE INDEX _%s_tokenKey_idx ON {{%s}} ([[tokenKey]]);
`,
newCollection.Id, tableName,
newCollection.Id, tableName,
newCollection.Id, tableName,
)).Execute()
if err != nil {
return err
}
}

return nil
}

// update
return dao.RunInTransaction(func(txDao *Dao) error {
oldTableName := oldCollection.Name
newTableName := newCollection.Name
oldSchema := oldCollection.Schema
newSchema := newCollection.Schema

// check for renamed table
if !strings.EqualFold(oldTableName, newTableName) {
_, err := txDao.DB().RenameTable(oldTableName, newTableName).Execute()
if err != nil {
return err
}
}

// check for deleted columns
for _, oldField := range oldSchema.Fields() {
if f := newSchema.GetFieldById(oldField.Id); f != nil {
continue // exist
}

_, err := txDao.DB().DropColumn(newTableName, oldField.Name).Execute()
if err != nil {
return err
}
}

// check for new or renamed columns
toRename := map[string]string{}
for _, field := range newSchema.Fields() {
oldField := oldSchema.GetFieldById(field.Id)
// Note:
// We are using a temporary column name when adding or renaming columns
// to ensure that there are no name collisions in case there is
// names switch/reuse of existing columns (eg. name, title -> title, name).
// This way we are always doing 1 more rename operation but it provides better dev experience.

if oldField == nil {
tempName := field.Name + security.PseudorandomString(5)
toRename[tempName] = field.Name

// add
_, err := txDao.DB().AddColumn(newTableName, tempName, field.ColDefinition()).Execute()
if err != nil {
return err
}
} else if oldField.Name != field.Name {
tempName := field.Name + security.PseudorandomString(5)
toRename[tempName] = field.Name

// rename
_, err := txDao.DB().RenameColumn(newTableName, oldField.Name, tempName).Execute()
if err != nil {
return err
}
}
}

// set the actual columns name
for tempName, actualName := range toRename {
_, err := txDao.DB().RenameColumn(newTableName, tempName, actualName).Execute()
if err != nil {
return err
}
}

return nil
})
}
Loading

0 comments on commit 0eeae9d

Please sign in to comment.