Skip to content

Commit

Permalink
removed delete worker pool since it is no longer needed and changed t…
Browse files Browse the repository at this point in the history
…he files delete operation to run in the background (will be replaced with job queue)
  • Loading branch information
ganigeorgiev committed Jan 12, 2023
1 parent f792a9e commit 012546e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 54 deletions.
3 changes: 3 additions & 0 deletions apis/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/labstack/echo/v5"
"github.com/pocketbase/pocketbase/models"
Expand Down Expand Up @@ -232,6 +233,7 @@ func TestCollectionDelete(t *testing.T) {
RequestHeaders: map[string]string{
"Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6InN5d2JoZWNuaDQ2cmhtMCIsInR5cGUiOiJhZG1pbiIsImV4cCI6MjIwODk4NTI2MX0.M1m--VOqGyv0d23eeUc0r9xE8ZzHaYVmVFw1VZW6gT8",
},
Delay: 100 * time.Millisecond,
ExpectedStatus: 204,
ExpectedEvents: map[string]int{
"OnModelBeforeDelete": 1,
Expand All @@ -250,6 +252,7 @@ func TestCollectionDelete(t *testing.T) {
RequestHeaders: map[string]string{
"Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6InN5d2JoZWNuaDQ2cmhtMCIsInR5cGUiOiJhZG1pbiIsImV4cCI6MjIwODk4NTI2MX0.M1m--VOqGyv0d23eeUc0r9xE8ZzHaYVmVFw1VZW6gT8",
},
Delay: 100 * time.Millisecond,
ExpectedStatus: 204,
ExpectedEvents: map[string]int{
"OnModelBeforeDelete": 1,
Expand Down
3 changes: 3 additions & 0 deletions apis/record_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/labstack/echo/v5"
"github.com/pocketbase/pocketbase/models"
Expand Down Expand Up @@ -736,6 +737,7 @@ func TestRecordCrudDelete(t *testing.T) {
// users, [email protected]
"Authorization": "eyJhbGciOiJIUzI1NiJ9.eyJpZCI6IjRxMXhsY2xtZmxva3UzMyIsInR5cGUiOiJhdXRoUmVjb3JkIiwiY29sbGVjdGlvbklkIjoiX3BiX3VzZXJzX2F1dGhfIiwiZXhwIjoyMjA4OTg1MjYxfQ.UwD8JvkbQtXpymT09d7J6fdA0aP9g4FJ1GPh_ggEkzc",
},
Delay: 100 * time.Millisecond,
ExpectedStatus: 204,
ExpectedEvents: map[string]int{
"OnModelAfterDelete": 3, // +2 because of the external auths
Expand Down Expand Up @@ -822,6 +824,7 @@ func TestRecordCrudDelete(t *testing.T) {
RequestHeaders: map[string]string{
"Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6InN5d2JoZWNuaDQ2cmhtMCIsInR5cGUiOiJhZG1pbiIsImV4cCI6MjIwODk4NTI2MX0.M1m--VOqGyv0d23eeUc0r9xE8ZzHaYVmVFw1VZW6gT8",
},
Delay: 100 * time.Millisecond,
ExpectedStatus: 204,
ExpectedEvents: map[string]int{
"OnModelBeforeDelete": 2,
Expand Down
24 changes: 17 additions & 7 deletions core/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pocketbase/pocketbase/tools/filesystem"
"github.com/pocketbase/pocketbase/tools/hook"
"github.com/pocketbase/pocketbase/tools/mailer"
"github.com/pocketbase/pocketbase/tools/routine"
"github.com/pocketbase/pocketbase/tools/store"
"github.com/pocketbase/pocketbase/tools/subscriptions"
)
Expand Down Expand Up @@ -939,20 +940,29 @@ func (app *BaseApp) registerDefaultHooks() {

failed := fs.DeletePrefix(prefix)
if len(failed) > 0 {
return errors.New("Failed to delete the files at " + prefix)
return errors.New("failed to delete the files at " + prefix)
}

return nil
}

// delete storage files from deleted Collection, Records, etc.
// try to delete the storage files from deleted Collection, Records, etc. model
app.OnModelAfterDelete().Add(func(e *ModelEvent) error {
if m, ok := e.Model.(models.FilesManager); ok && m.BaseFilesPath() != "" {
if err := deletePrefix(m.BaseFilesPath()); err != nil && app.IsDebug() {
// non critical error - only log for debug
// (usually could happen because of S3 api limits)
log.Println(err)
}
prefix := m.BaseFilesPath()

// run in the background for "optimistic" delete to avoid
// blocking the delete transaction
//
// @todo consider creating a bg process queue so that the
// call could be "retried" in case of a failure.
routine.FireAndForget(func() {
if err := deletePrefix(prefix); err != nil && app.IsDebug() {
// non critical error - only log for debug
// (usually could happen because of S3 api limits)
log.Println(err)
}
})
}

return nil
Expand Down
62 changes: 17 additions & 45 deletions daos/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package daos
import (
"errors"
"fmt"
"math"
"strings"

"github.com/pocketbase/dbx"
Expand Down Expand Up @@ -389,8 +388,7 @@ func (dao *Dao) DeleteRecord(record *models.Record) error {
})
}

// cascadeRecordDelete triggers cascade deletion for the provided references
// and split the work to a batched set of go routines.
// cascadeRecordDelete triggers cascade deletion for the provided references.
//
// NB! This method is expected to be called inside a transaction.
func (dao *Dao) cascadeRecordDelete(mainRecord *models.Record, refs map[*models.Collection][]*schema.SchemaField) error {
Expand All @@ -400,20 +398,21 @@ func (dao *Dao) cascadeRecordDelete(mainRecord *models.Record, refs map[*models.
for _, field := range fields {
recordTableName := inflector.Columnify(refCollection.Name)
prefixedFieldName := recordTableName + "." + inflector.Columnify(field.Name)

// @todo optimize single relation lookup in v0.12+
query := dao.RecordQuery(refCollection).
Distinct(true).
LeftJoin(fmt.Sprintf(
// note: the case is used to normalize value access for single and multiple relations.
AndWhere(dbx.Not(dbx.HashExp{recordTableName + ".id": mainRecord.Id})).
InnerJoin(fmt.Sprintf(
// note: the case is used to normalize the value access
`json_each(CASE WHEN json_valid([[%s]]) THEN [[%s]] ELSE json_array([[%s]]) END) as {{%s}}`,
prefixedFieldName, prefixedFieldName, prefixedFieldName, uniqueJsonEachAlias,
), nil).
AndWhere(dbx.Not(dbx.HashExp{recordTableName + ".id": mainRecord.Id})).
AndWhere(dbx.HashExp{uniqueJsonEachAlias + ".value": mainRecord.Id})
), dbx.HashExp{uniqueJsonEachAlias + ".value": mainRecord.Id})

// trigger cascade for each 1000 rel items until there is none
batchSize := 1000
// trigger cascade for each batchSize rel items until there is none
batchSize := 4000
rows := make([]dbx.NullStringMap, 0, batchSize)
for {
rows := make([]dbx.NullStringMap, 0, batchSize)
if err := query.Limit(int64(batchSize)).All(&rows); err != nil {
return err
}
Expand All @@ -423,45 +422,18 @@ func (dao *Dao) cascadeRecordDelete(mainRecord *models.Record, refs map[*models.
break
}

perWorker := 50
workers := int(math.Ceil(float64(total) / float64(perWorker)))

batchErr := func() error {
ch := make(chan error)
defer close(ch)

for i := 0; i < workers; i++ {
var chunks []dbx.NullStringMap
if len(rows) <= perWorker {
chunks = rows
rows = nil
} else {
chunks = rows[:perWorker]
rows = rows[perWorker:]
}

go func() {
refRecords := models.NewRecordsFromNullStringMaps(refCollection, chunks)
ch <- dao.deleteRefRecords(mainRecord, refRecords, field)
}()
}

for i := 0; i < workers; i++ {
if err := <-ch; err != nil {
return err
}
}

return nil
}()

if batchErr != nil {
return batchErr
refRecords := models.NewRecordsFromNullStringMaps(refCollection, rows)

err := dao.deleteRefRecords(mainRecord, refRecords, field)
if err != nil {
return err
}

if total < batchSize {
break // no more items
}

rows = rows[:0] // keep allocated memory
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions daos/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,11 +663,11 @@ func TestDeleteRecord(t *testing.T) {
}
// ensure that the json rel fields were prefixed
joinedQueries := strings.Join(calledQueries, " ")
expectedRelManyJoin := "`demo1` LEFT JOIN json_each(CASE WHEN json_valid([[demo1.rel_many]]) THEN [[demo1.rel_many]] ELSE json_array([[demo1.rel_many]]) END)"
expectedRelManyJoin := "`demo1` INNER JOIN json_each(CASE WHEN json_valid([[demo1.rel_many]]) THEN [[demo1.rel_many]] ELSE json_array([[demo1.rel_many]]) END)"
if !strings.Contains(joinedQueries, expectedRelManyJoin) {
t.Fatalf("(rec3) Expected the cascade delete to call the query \n%v, got \n%v", expectedRelManyJoin, calledQueries)
}
expectedRelOneJoin := "`demo1` LEFT JOIN json_each(CASE WHEN json_valid([[demo1.rel_one]]) THEN [[demo1.rel_one]] ELSE json_array([[demo1.rel_one]]) END)"
expectedRelOneJoin := "`demo1` INNER JOIN json_each(CASE WHEN json_valid([[demo1.rel_one]]) THEN [[demo1.rel_one]] ELSE json_array([[demo1.rel_one]]) END)"
if !strings.Contains(joinedQueries, expectedRelOneJoin) {
t.Fatalf("(rec3) Expected the cascade delete to call the query \n%v, got \n%v", expectedRelOneJoin, calledQueries)
}
Expand Down

0 comments on commit 012546e

Please sign in to comment.