Skip to content

Commit

Permalink
Move category creation and UUID update to a more performant data migr…
Browse files Browse the repository at this point in the history
…ation (mattermost-community#3437)

* Move category creation and UUID update to a more performant data migration

* Fix linter

* Fix linter
  • Loading branch information
mgdelacroix authored Jul 27, 2022
1 parent a129dbc commit 6616a16
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 152 deletions.
251 changes: 155 additions & 96 deletions server/services/store/sqlstore/data_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
)

const (
// we group the inserts on batches of 1000 because PostgreSQL
// supports a limit of around 64K values (not rows) on an insert
// query, so we want to stay safely below.
CategoryInsertBatch = 1000

TemplatesToTeamsMigrationKey = "TemplatesToTeamsMigrationComplete"
UniqueIDsMigrationKey = "UniqueIDsMigrationComplete"
CategoryUUIDIDMigrationKey = "CategoryUuidIdMigrationComplete"
Expand Down Expand Up @@ -122,6 +127,10 @@ func (s *SQLStore) runUniqueIDsMigration() error {
return nil
}

// runCategoryUUIDIDMigration takes care of deriving the categories
// from the boards and its memberships. The name references UUID
// because of the preexisting purpose of this migration, and has been
// preserved for compatibility with already migrated instances.
func (s *SQLStore) runCategoryUUIDIDMigration() error {
setting, err := s.GetSystemSetting(CategoryUUIDIDMigrationKey)
if err != nil {
Expand All @@ -140,146 +149,196 @@ func (s *SQLStore) runCategoryUUIDIDMigration() error {
return txErr
}

if err := s.updateCategoryIDs(tx); err != nil {
return err
}
if s.isPlugin {
if err := s.createCategories(tx); err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
s.logger.Error("category UUIDs insert categories transaction rollback error", mlog.Err(rollbackErr), mlog.String("methodName", "setSystemSetting"))
}
return err
}

if err := s.updateCategoryBlocksIDs(tx); err != nil {
return err
if err := s.createCategoryBoards(tx); err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
s.logger.Error("category UUIDs insert category boards transaction rollback error", mlog.Err(rollbackErr), mlog.String("methodName", "setSystemSetting"))
}
return err
}
}

if err := s.setSystemSetting(tx, CategoryUUIDIDMigrationKey, strconv.FormatBool(true)); err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
s.logger.Error("category IDs transaction rollback error", mlog.Err(rollbackErr), mlog.String("methodName", "setSystemSetting"))
s.logger.Error("category UUIDs transaction rollback error", mlog.Err(rollbackErr), mlog.String("methodName", "setSystemSetting"))
}
return fmt.Errorf("cannot mark migration as completed: %w", err)
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("cannot commit category IDs transaction: %w", err)
return fmt.Errorf("cannot commit category UUIDs transaction: %w", err)
}

s.logger.Debug("category IDs migration finished successfully")
s.logger.Debug("category UUIDs migration finished successfully")
return nil
}

func (s *SQLStore) updateCategoryIDs(db sq.BaseRunner) error {
// fetch all category IDs
oldCategoryIDs, err := s.getIDs(db, "categories")
if err != nil {
return err
}

// map old category ID to new ID
categoryIDs := map[string]string{}
for _, oldID := range oldCategoryIDs {
newID := utils.NewID(utils.IDTypeNone)
categoryIDs[oldID] = newID
}

// update for each category ID.
// Update the new ID in category table,
// and update corresponding rows in category boards table.
for oldID, newID := range categoryIDs {
if err := s.updateCategoryID(db, oldID, newID); err != nil {
return err
}
}

return nil
}

func (s *SQLStore) getIDs(db sq.BaseRunner, table string) ([]string, error) {
func (s *SQLStore) createCategories(db sq.BaseRunner) error {
rows, err := s.getQueryBuilder(db).
Select("id").
From(s.tablePrefix + table).
Select("c.DisplayName, cm.UserId, c.TeamId, cm.ChannelId").
From(s.tablePrefix + "boards boards").
Join("ChannelMembers cm on boards.channel_id = cm.ChannelId").
Join("Channels c on cm.ChannelId = c.id and (c.Type = 'O' or c.Type = 'P')").
GroupBy("cm.UserId, c.TeamId, cm.ChannelId, c.DisplayName").
Query()

if err != nil {
s.logger.Error("getIDs error", mlog.String("table", table), mlog.Err(err))
return nil, err
s.logger.Error("get boards data error", mlog.Err(err))
return err
}
defer s.CloseRows(rows)

var categoryIDs []string
initQuery := func() sq.InsertBuilder {
return s.getQueryBuilder(db).
Insert(s.tablePrefix+"categories").
Columns(
"id",
"name",
"user_id",
"team_id",
"channel_id",
"create_at",
"update_at",
"delete_at",
)
}
// query will accumulate the insert values until the limit is
// reached, and then it will be stored and reset
query := initQuery()
// queryList stores those queries that already reached the limit
// to be run when all the data is processed
queryList := []sq.InsertBuilder{}
counter := 0
now := model.GetMillis()

for rows.Next() {
var id string
err := rows.Scan(&id)
var displayName string
var userID string
var teamID string
var channelID string

err := rows.Scan(
&displayName,
&userID,
&teamID,
&channelID,
)
if err != nil {
s.logger.Error("getIDs scan row error", mlog.String("table", table), mlog.Err(err))
return nil, err
return fmt.Errorf("cannot scan result while trying to create categories: %w", err)
}

categoryIDs = append(categoryIDs, id)
query = query.Values(
utils.NewID(utils.IDTypeNone),
displayName,
userID,
teamID,
channelID,
now,
0,
0,
)

counter++
if counter%CategoryInsertBatch == 0 {
queryList = append(queryList, query)
query = initQuery()
}
}

return categoryIDs, nil
}

func (s *SQLStore) updateCategoryID(db sq.BaseRunner, oldID, newID string) error {
// update in category table
_, err := s.getQueryBuilder(db).
Update(s.tablePrefix+"categories").
Set("id", newID).
Where(sq.Eq{"id": oldID}).
Exec()

if err != nil {
s.logger.Error("updateCategoryID update category error", mlog.Err(err))
return err
if counter%CategoryInsertBatch != 0 {
queryList = append(queryList, query)
}

// update category boards table
_, err = s.getQueryBuilder(db).
Update(s.tablePrefix+"category_boards").
Set("category_id", newID).
Where(sq.Eq{"category_id": oldID}).
Exec()

if err != nil {
s.logger.Error("updateCategoryID update category boards error", mlog.Err(err))
return err
for _, q := range queryList {
if _, err := q.Exec(); err != nil {
return fmt.Errorf("cannot create category values: %w", err)
}
}

return nil
}

func (s *SQLStore) updateCategoryBlocksIDs(db sq.BaseRunner) error {
// fetch all category IDs
oldCategoryIDs, err := s.getIDs(db, "category_boards")
func (s *SQLStore) createCategoryBoards(db sq.BaseRunner) error {
rows, err := s.getQueryBuilder(db).
Select("categories.user_id, categories.id, boards.id").
From(s.tablePrefix + "categories categories").
Join(s.tablePrefix + "boards boards on categories.channel_id = boards.channel_id AND boards.is_template = false").
Query()

if err != nil {
s.logger.Error("get categories data error", mlog.Err(err))
return err
}
defer s.CloseRows(rows)

// map old category ID to new ID
categoryIDs := map[string]string{}
for _, oldID := range oldCategoryIDs {
newID := utils.NewID(utils.IDTypeNone)
categoryIDs[oldID] = newID
}
initQuery := func() sq.InsertBuilder {
return s.getQueryBuilder(db).
Insert(s.tablePrefix+"category_boards").
Columns(
"id",
"user_id",
"category_id",
"board_id",
"create_at",
"update_at",
"delete_at",
)
}
// query will accumulate the insert values until the limit is
// reached, and then it will be stored and reset
query := initQuery()
// queryList stores those queries that already reached the limit
// to be run when all the data is processed
queryList := []sq.InsertBuilder{}
counter := 0
now := model.GetMillis()

// update for each category ID.
// Update the new ID in category table,
// and update corresponding rows in category boards table.
for oldID, newID := range categoryIDs {
if err := s.updateCategoryBlocksID(db, oldID, newID); err != nil {
return err
for rows.Next() {
var userID string
var categoryID string
var boardID string

err := rows.Scan(
&userID,
&categoryID,
&boardID,
)
if err != nil {
return fmt.Errorf("cannot scan result while trying to create category boards: %w", err)
}

query = query.Values(
utils.NewID(utils.IDTypeNone),
userID,
categoryID,
boardID,
now,
0,
0,
)

counter++
if counter%CategoryInsertBatch == 0 {
queryList = append(queryList, query)
query = initQuery()
}
}
return nil
}

func (s *SQLStore) updateCategoryBlocksID(db sq.BaseRunner, oldID, newID string) error {
// update in category table
_, err := s.getQueryBuilder(db).
Update(s.tablePrefix+"category_boards").
Set("id", newID).
Where(sq.Eq{"id": oldID}).
Exec()
if counter%CategoryInsertBatch != 0 {
queryList = append(queryList, query)
}

if err != nil {
s.logger.Error("updateCategoryBlocksID update category error", mlog.Err(err))
return err
for _, q := range queryList {
if _, err := q.Exec(); err != nil {
return fmt.Errorf("cannot create category boards values: %w", err)
}
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,3 @@ CREATE TABLE IF NOT EXISTS {{.prefix}}categories (
) {{if .mysql}}DEFAULT CHARACTER SET utf8mb4{{end}};

CREATE INDEX idx_categories_user_id_team_id ON {{.prefix}}categories(user_id, team_id);

{{if .plugin}}
INSERT INTO {{.prefix}}categories(
id,
name,
user_id,
team_id,
channel_id,
create_at,
update_at,
delete_at
)
SELECT
{{ if .postgres }}
REPLACE(uuid_in(md5(random()::text || clock_timestamp()::text)::cstring)::varchar, '-', ''),
{{ end }}
{{ if .mysql }}
UUID(),
{{ end }}
c.DisplayName,
cm.UserId,
c.TeamId,
cm.ChannelId,
{{if .postgres}}(extract(epoch from now())*1000)::bigint,{{end}}
{{if .mysql}}UNIX_TIMESTAMP() * 1000,{{end}}
0,
0
FROM
{{.prefix}}boards boards
JOIN ChannelMembers cm on boards.channel_id = cm.ChannelId
JOIN Channels c on cm.ChannelId = c.id and (c.Type = 'O' or c.Type = 'P')
GROUP BY cm.UserId, c.TeamId, cm.ChannelId, c.DisplayName;
{{end}}
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,6 @@ CREATE TABLE IF NOT EXISTS {{.prefix}}category_boards (
update_at BIGINT,
delete_at BIGINT,
PRIMARY KEY (id)
) {{if .mysql}}DEFAULT CHARACTER SET utf8mb4{{end}};
) {{if .mysql}}DEFAULT CHARACTER SET utf8mb4{{end}};

CREATE INDEX idx_categoryboards_category_id ON {{.prefix}}category_boards(category_id);

{{if .plugin}}
INSERT INTO {{.prefix}}category_boards(id, user_id, category_id, board_id, create_at, update_at, delete_at)
SELECT
{{ if .postgres }}
REPLACE(uuid_in(md5(random()::text || clock_timestamp()::text)::cstring)::varchar, '-', ''),
{{ end }}
{{ if .mysql }}
UUID(),
{{ end }}
{{.prefix}}categories.user_id,
{{.prefix}}categories.id,
{{.prefix}}boards.id,
{{if .postgres}}(extract(epoch from now())*1000)::bigint,{{end}}
{{if .mysql}}UNIX_TIMESTAMP() * 1000,{{end}}
0,
0
FROM
{{.prefix}}categories
JOIN {{.prefix}}boards ON {{.prefix}}categories.channel_id = {{.prefix}}boards.channel_id
AND {{.prefix}}boards.is_template = false;
{{end}}

0 comments on commit 6616a16

Please sign in to comment.