forked from instructure/canvas-lms
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bump inst-jobs and switchman-inst-jobs
refs DE-923 Change-Id: I3720242a11053fe7a6b3e539e159af1df5478686 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/284446 Tested-by: Service Cloud Jenkins <[email protected]> Reviewed-by: Jacob Burroughs <[email protected]> Migration-Review: Jacob Burroughs <[email protected]> QA-Review: Aaron Ogata <[email protected]> Product-Review: Aaron Ogata <[email protected]>
- Loading branch information
1 parent
2130e27
commit 12b1626
Showing
5 changed files
with
418 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
db/migrate/20220127091200_fix_singleton_unique_constraint.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# frozen_string_literal: true | ||
|
||
# Copyright (C) 2021 - present Instructure, Inc. | ||
# | ||
# This file is part of Canvas. | ||
# | ||
# Canvas is free software: you can redistribute it and/or modify it under | ||
# the terms of the GNU Affero General Public License as published by the Free | ||
# Software Foundation, version 3 of the License. | ||
# | ||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY | ||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | ||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more | ||
# details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License along | ||
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
# rubocop:disable Rails/SquishedSQLHeredocs | ||
class FixSingletonUniqueConstraint < ActiveRecord::Migration[5.2] | ||
disable_ddl_transaction! | ||
tag :predeploy | ||
|
||
def up | ||
rename_index :delayed_jobs, "index_delayed_jobs_on_singleton_not_running", "index_delayed_jobs_on_singleton_not_running_old" | ||
rename_index :delayed_jobs, "index_delayed_jobs_on_singleton_running", "index_delayed_jobs_on_singleton_running_old" | ||
|
||
# only one job can be queued in a singleton | ||
add_index :delayed_jobs, | ||
:singleton, | ||
where: "singleton IS NOT NULL AND (locked_by IS NULL OR locked_by = '#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}')", | ||
unique: true, | ||
name: "index_delayed_jobs_on_singleton_not_running", | ||
algorithm: :concurrently, | ||
if_not_exists: true | ||
|
||
# only one job can be running for a singleton | ||
add_index :delayed_jobs, | ||
:singleton, | ||
where: "singleton IS NOT NULL AND locked_by IS NOT NULL AND locked_by <> '#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}'", | ||
unique: true, | ||
name: "index_delayed_jobs_on_singleton_running", | ||
algorithm: :concurrently, | ||
if_not_exists: true | ||
end | ||
|
||
def down | ||
remove_index :delayed_jobs, name: "index_delayed_jobs_on_singleton_not_running_old", if_exists: true | ||
remove_index :delayed_jobs, name: "index_delayed_jobs_on_singleton_running_old", if_exists: true | ||
end | ||
end | ||
# rubocop:enable Rails/SquishedSQLHeredocs |
80 changes: 80 additions & 0 deletions
80
db/migrate/20220128084800_update_insert_trigger_for_singleton_unique_constraint_change.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# frozen_string_literal: true | ||
|
||
# Copyright (C) 2021 - present Instructure, Inc. | ||
# | ||
# This file is part of Canvas. | ||
# | ||
# Canvas is free software: you can redistribute it and/or modify it under | ||
# the terms of the GNU Affero General Public License as published by the Free | ||
# Software Foundation, version 3 of the License. | ||
# | ||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY | ||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | ||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more | ||
# details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License along | ||
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
# rubocop:disable Rails/SquishedSQLHeredocs | ||
class UpdateInsertTriggerForSingletonUniqueConstraintChange < ActiveRecord::Migration[5.2] | ||
tag :predeploy | ||
|
||
def change | ||
reversible do |direction| | ||
direction.up do | ||
execute(<<~SQL) | ||
CREATE OR REPLACE FUNCTION #{connection.quote_table_name("delayed_jobs_before_insert_row_tr_fn")} () RETURNS trigger AS $$ | ||
BEGIN | ||
IF NEW.strand IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand)); | ||
IF (SELECT COUNT(*) FROM ( | ||
SELECT 1 FROM delayed_jobs WHERE strand = NEW.strand AND next_in_strand=true LIMIT NEW.max_concurrent | ||
) s) = NEW.max_concurrent THEN | ||
NEW.next_in_strand := false; | ||
END IF; | ||
END IF; | ||
IF NEW.singleton IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(CONCAT('singleton:', NEW.singleton))); | ||
-- this condition seems silly, but it forces postgres to use the two partial indexes on singleton, | ||
-- rather than doing a seq scan | ||
PERFORM 1 FROM delayed_jobs WHERE singleton = NEW.singleton AND (locked_by IS NULL OR locked_by = '#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}' OR locked_by <> '#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}'); | ||
IF FOUND THEN | ||
NEW.next_in_strand := false; | ||
END IF; | ||
END IF; | ||
RETURN NEW; | ||
END; | ||
$$ LANGUAGE plpgsql SET search_path TO #{::Switchman::Shard.current.name}; | ||
SQL | ||
end | ||
direction.down do | ||
execute(<<~SQL) | ||
CREATE OR REPLACE FUNCTION #{connection.quote_table_name("delayed_jobs_before_insert_row_tr_fn")} () RETURNS trigger AS $$ | ||
BEGIN | ||
IF NEW.strand IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand)); | ||
IF (SELECT COUNT(*) FROM ( | ||
SELECT 1 FROM delayed_jobs WHERE strand = NEW.strand AND next_in_strand=true LIMIT NEW.max_concurrent | ||
) s) = NEW.max_concurrent THEN | ||
NEW.next_in_strand := false; | ||
END IF; | ||
END IF; | ||
IF NEW.singleton IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(CONCAT('singleton:', NEW.singleton))); | ||
-- this condition seems silly, but it forces postgres to use the two partial indexes on singleton, | ||
-- rather than doing a seq scan | ||
PERFORM 1 FROM delayed_jobs WHERE singleton = NEW.singleton AND (locked_by IS NULL OR locked_by IS NOT NULL); | ||
IF FOUND THEN | ||
NEW.next_in_strand := false; | ||
END IF; | ||
END IF; | ||
RETURN NEW; | ||
END; | ||
$$ LANGUAGE plpgsql SET search_path TO #{::Switchman::Shard.current.name}; | ||
SQL | ||
end | ||
end | ||
end | ||
end | ||
# rubocop:enable Rails/SquishedSQLHeredocs |
231 changes: 231 additions & 0 deletions
231
db/migrate/20220128084900_update_delete_trigger_for_singleton_unique_constraint_change.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
# frozen_string_literal: true | ||
|
||
# | ||
# Copyright (C) 2021 - present Instructure, Inc. | ||
# | ||
# This file is part of Canvas. | ||
# | ||
# Canvas is free software: you can redistribute it and/or modify it under | ||
# the terms of the GNU Affero General Public License as published by the Free | ||
# Software Foundation, version 3 of the License. | ||
# | ||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY | ||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | ||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more | ||
# details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License along | ||
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
# | ||
|
||
# rubocop:disable Rails/SquishedSQLHeredocs | ||
class UpdateDeleteTriggerForSingletonUniqueConstraintChange < ActiveRecord::Migration[5.2] | ||
tag :predeploy | ||
|
||
def up | ||
execute(<<~SQL) | ||
CREATE OR REPLACE FUNCTION #{connection.quote_table_name("delayed_jobs_after_delete_row_tr_fn")} () RETURNS trigger AS $$ | ||
DECLARE | ||
next_strand varchar; | ||
running_count integer; | ||
should_lock boolean; | ||
should_be_precise boolean; | ||
update_query varchar; | ||
skip_locked varchar; | ||
transition boolean; | ||
BEGIN | ||
IF OLD.strand IS NOT NULL THEN | ||
should_lock := true; | ||
should_be_precise := OLD.id % (OLD.max_concurrent * 4) = 0; | ||
IF NOT should_be_precise AND OLD.max_concurrent > 16 THEN | ||
running_count := (SELECT COUNT(*) FROM ( | ||
SELECT 1 as one FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't' LIMIT OLD.max_concurrent | ||
) subquery_for_count); | ||
should_lock := running_count < OLD.max_concurrent; | ||
END IF; | ||
IF should_lock THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand)); | ||
END IF; | ||
-- note that we don't really care if the row we're deleting has a singleton, or if it even | ||
-- matches the row(s) we're going to update. we just need to make sure that whatever | ||
-- singleton we grab isn't already running (which is a simple existence check, since | ||
-- the unique indexes ensure there is at most one singleton running, and one queued) | ||
update_query := 'UPDATE delayed_jobs SET next_in_strand=true WHERE id IN ( | ||
SELECT id FROM delayed_jobs j2 | ||
WHERE next_in_strand=false AND | ||
j2.strand=$1.strand AND | ||
(j2.singleton IS NULL OR NOT EXISTS (SELECT 1 FROM delayed_jobs j3 WHERE j3.singleton=j2.singleton AND j3.id<>j2.id AND (j3.locked_by IS NULL OR j3.locked_by = ''#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}'' OR j3.locked_by <> ''#{::Delayed::Backend::Base::ON_HOLD_LOCKED_BY}''))) | ||
ORDER BY j2.strand_order_override ASC, j2.id ASC | ||
LIMIT '; | ||
IF should_be_precise THEN | ||
running_count := (SELECT COUNT(*) FROM ( | ||
SELECT 1 FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't' LIMIT OLD.max_concurrent | ||
) s); | ||
IF running_count < OLD.max_concurrent THEN | ||
update_query := update_query || '($1.max_concurrent - $2)'; | ||
ELSE | ||
-- we have too many running already; just bail | ||
RETURN OLD; | ||
END IF; | ||
ELSE | ||
update_query := update_query || '1'; | ||
-- n-strands don't require precise ordering; we can make this query more performant | ||
IF OLD.max_concurrent > 1 THEN | ||
skip_locked := ' SKIP LOCKED'; | ||
END IF; | ||
END IF; | ||
update_query := update_query || ' FOR UPDATE' || COALESCE(skip_locked, '') || ')'; | ||
EXECUTE update_query USING OLD, running_count; | ||
END IF; | ||
IF OLD.singleton IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(CONCAT('singleton:', OLD.singleton))); | ||
transition := EXISTS (SELECT 1 FROM delayed_jobs AS j1 WHERE j1.singleton = OLD.singleton AND j1.strand IS DISTINCT FROM OLD.strand AND locked_by IS NULL); | ||
IF transition THEN | ||
next_strand := (SELECT j1.strand FROM delayed_jobs AS j1 WHERE j1.singleton = OLD.singleton AND j1.strand IS DISTINCT FROM OLD.strand AND locked_by IS NULL AND j1.strand IS NOT NULL LIMIT 1); | ||
IF next_strand IS NOT NULL THEN | ||
-- if the singleton has a new strand defined, we need to lock it to ensure we obey n_strand constraints -- | ||
IF NOT pg_try_advisory_xact_lock(half_md5_as_bigint(next_strand)) THEN | ||
-- a failure to acquire the lock means that another process already has it and will thus handle this singleton -- | ||
RETURN OLD; | ||
END IF; | ||
END IF; | ||
ELSIF OLD.strand IS NOT NULL THEN | ||
-- if there is no transition and there is a strand then we have already handled this singleton in the case above -- | ||
RETURN OLD; | ||
END IF; | ||
-- handles transitioning a singleton from stranded to not stranded -- | ||
-- handles transitioning a singleton from unstranded to stranded -- | ||
-- handles transitioning a singleton from strand A to strand B -- | ||
-- these transitions are a relatively rare case, so we take a shortcut and -- | ||
-- only start the next singleton if its strand does not currently have any running jobs -- | ||
-- if it does, the next stranded job that finishes will start this singleton if it can -- | ||
UPDATE delayed_jobs SET next_in_strand=true WHERE id IN ( | ||
SELECT id FROM delayed_jobs j2 | ||
WHERE next_in_strand=false AND | ||
j2.singleton=OLD.singleton AND | ||
j2.locked_by IS NULL AND | ||
(j2.strand IS NULL OR NOT EXISTS (SELECT 1 FROM delayed_jobs j3 WHERE j3.strand=j2.strand AND j3.id<>j2.id)) | ||
FOR UPDATE | ||
); | ||
END IF; | ||
RETURN OLD; | ||
END; | ||
$$ LANGUAGE plpgsql SET search_path TO #{::Switchman::Shard.current.name}; | ||
SQL | ||
end | ||
|
||
def down | ||
execute(<<~SQL) | ||
CREATE OR REPLACE FUNCTION #{connection.quote_table_name("delayed_jobs_after_delete_row_tr_fn")} () RETURNS trigger AS $$ | ||
DECLARE | ||
next_strand varchar; | ||
running_count integer; | ||
should_lock boolean; | ||
should_be_precise boolean; | ||
update_query varchar; | ||
skip_locked varchar; | ||
transition boolean; | ||
BEGIN | ||
IF OLD.strand IS NOT NULL THEN | ||
should_lock := true; | ||
should_be_precise := OLD.id % (OLD.max_concurrent * 4) = 0; | ||
IF NOT should_be_precise AND OLD.max_concurrent > 16 THEN | ||
running_count := (SELECT COUNT(*) FROM ( | ||
SELECT 1 as one FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't' LIMIT OLD.max_concurrent | ||
) subquery_for_count); | ||
should_lock := running_count < OLD.max_concurrent; | ||
END IF; | ||
IF should_lock THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand)); | ||
END IF; | ||
-- note that we don't really care if the row we're deleting has a singleton, or if it even | ||
-- matches the row(s) we're going to update. we just need to make sure that whatever | ||
-- singleton we grab isn't already running (which is a simple existence check, since | ||
-- the unique indexes ensure there is at most one singleton running, and one queued) | ||
update_query := 'UPDATE delayed_jobs SET next_in_strand=true WHERE id IN ( | ||
SELECT id FROM delayed_jobs j2 | ||
WHERE next_in_strand=false AND | ||
j2.strand=$1.strand AND | ||
(j2.singleton IS NULL OR NOT EXISTS (SELECT 1 FROM delayed_jobs j3 WHERE j3.singleton=j2.singleton AND j3.id<>j2.id AND (j3.locked_by IS NULL OR j3.locked_by IS NOT NULL))) | ||
ORDER BY j2.strand_order_override ASC, j2.id ASC | ||
LIMIT '; | ||
IF should_be_precise THEN | ||
running_count := (SELECT COUNT(*) FROM ( | ||
SELECT 1 FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't' LIMIT OLD.max_concurrent | ||
) s); | ||
IF running_count < OLD.max_concurrent THEN | ||
update_query := update_query || '($1.max_concurrent - $2)'; | ||
ELSE | ||
-- we have too many running already; just bail | ||
RETURN OLD; | ||
END IF; | ||
ELSE | ||
update_query := update_query || '1'; | ||
-- n-strands don't require precise ordering; we can make this query more performant | ||
IF OLD.max_concurrent > 1 THEN | ||
skip_locked := ' SKIP LOCKED'; | ||
END IF; | ||
END IF; | ||
update_query := update_query || ' FOR UPDATE' || COALESCE(skip_locked, '') || ')'; | ||
EXECUTE update_query USING OLD, running_count; | ||
END IF; | ||
IF OLD.singleton IS NOT NULL THEN | ||
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(CONCAT('singleton:', OLD.singleton))); | ||
transition := EXISTS (SELECT 1 FROM delayed_jobs AS j1 WHERE j1.singleton = OLD.singleton AND j1.strand IS DISTINCT FROM OLD.strand AND locked_by IS NULL); | ||
IF transition THEN | ||
next_strand := (SELECT j1.strand FROM delayed_jobs AS j1 WHERE j1.singleton = OLD.singleton AND j1.strand IS DISTINCT FROM OLD.strand AND locked_by IS NULL AND j1.strand IS NOT NULL LIMIT 1); | ||
IF next_strand IS NOT NULL THEN | ||
-- if the singleton has a new strand defined, we need to lock it to ensure we obey n_strand constraints -- | ||
IF NOT pg_try_advisory_xact_lock(half_md5_as_bigint(next_strand)) THEN | ||
-- a failure to acquire the lock means that another process already has it and will thus handle this singleton -- | ||
RETURN OLD; | ||
END IF; | ||
END IF; | ||
ELSIF OLD.strand IS NOT NULL THEN | ||
-- if there is no transition and there is a strand then we have already handled this singleton in the case above -- | ||
RETURN OLD; | ||
END IF; | ||
-- handles transitioning a singleton from stranded to not stranded -- | ||
-- handles transitioning a singleton from unstranded to stranded -- | ||
-- handles transitioning a singleton from strand A to strand B -- | ||
-- these transitions are a relatively rare case, so we take a shortcut and -- | ||
-- only start the next singleton if its strand does not currently have any running jobs -- | ||
-- if it does, the next stranded job that finishes will start this singleton if it can -- | ||
UPDATE delayed_jobs SET next_in_strand=true WHERE id IN ( | ||
SELECT id FROM delayed_jobs j2 | ||
WHERE next_in_strand=false AND | ||
j2.singleton=OLD.singleton AND | ||
j2.locked_by IS NULL AND | ||
(j2.strand IS NULL OR NOT EXISTS (SELECT 1 FROM delayed_jobs j3 WHERE j3.strand=j2.strand AND j3.id<>j2.id)) | ||
FOR UPDATE | ||
); | ||
END IF; | ||
RETURN OLD; | ||
END; | ||
$$ LANGUAGE plpgsql SET search_path TO #{::Switchman::Shard.current.name}; | ||
SQL | ||
end | ||
end | ||
# rubocop:enable Rails/SquishedSQLHeredocs |
Oops, something went wrong.