Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve complete job #3443

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve complete_job
  • Loading branch information
antgonza committed Oct 18, 2024
commit 33e33203831ba822a0c4c5ed5f60e5fc2039aaa8
4 changes: 3 additions & 1 deletion qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def post(self, job_id):
cmd, values_dict={'job_id': job_id,
'payload': self.request.body.decode(
'ascii')})
job = qdb.processing_job.ProcessingJob.create(job.user, params)
# complete_job are unique so it is fine to force them to be created
job = qdb.processing_job.ProcessingJob.create(
job.user, params, force=True)
job.submit()

self.finish()
Expand Down
96 changes: 48 additions & 48 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,56 +581,56 @@ def create(cls, user, parameters, force=False):
"""
TTRN = qdb.sql_connection.TRN
with TTRN:
command = parameters.command
if not force:
command = parameters.command

# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email, processing_job_status,
COUNT(aopj.artifact_id)
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
LEFT JOIN qiita.artifact_output_processing_job aopj
USING (processing_job_id)
WHERE command_id = %s AND processing_job_status IN (
'success', 'waiting', 'running', 'in_construction') {0}
GROUP BY processing_job_id, email,
processing_job_status"""

# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, processing_job_status
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
LEFT JOIN qiita.artifact_output_processing_job aopj
USING (processing_job_id)
WHERE command_id = %s AND processing_job_status IN (
'success', 'waiting', 'running', 'in_construction')
{0}"""

# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s = %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
# the sql variable expects the list of parameters but if there
# is no param we need to replace the {0} with an empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs and not force:
raise ValueError(
'Cannot create job because the parameters are the same as '
'jobs that are queued, running or already have '
'succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))
# the sql variable expects the list of parameters but if
# there is no param we need to replace the {0} with an
# empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs:
raise ValueError(
'Cannot create job because the parameters are the '
'same as jobs that are queued, running or already '
'have succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))

sql = """INSERT INTO qiita.processing_job
(email, command_id, command_parameters,
Expand Down
56 changes: 56 additions & 0 deletions qiita_db/support_files/patches/93.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- Oct 18, 2024
-- ProcessingJob.create can take up to 52 seconds if creating a complete_job; mainly
-- due to the number of jobs of this command and using json. The solution in the database
-- is to convert to jsonb and index the values of the database

-- ### This are the stats before the change in a single example
-- GroupAggregate (cost=67081.81..67081.83 rows=1 width=77) (actual time=51859.962..51862.637 rows=1 loops=1)
-- Group Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- -> Sort (cost=67081.81..67081.81 rows=1 width=77) (actual time=51859.952..51862.627 rows=1 loops=1)
-- Sort Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- Sort Method: quicksort Memory: 25kB
-- -> Nested Loop Left Join (cost=4241.74..67081.80 rows=1 width=77) (actual time=51859.926..51862.604 rows=1 loops=1)
-- -> Nested Loop (cost=4237.30..67069.64 rows=1 width=69) (actual time=51859.889..51862.566 rows=1 loops=1)
-- Join Filter: (processing_job.processing_job_status_id = processing_job_status.processing_job_status_id)
-- Rows Removed by Join Filter: 1
-- -> Gather (cost=4237.30..67068.50 rows=1 width=45) (actual time=51859.846..51862.522 rows=1 loops=1)
-- Workers Planned: 2
-- Workers Launched: 2
-- -> Parallel Bitmap Heap Scan on processing_job (cost=3237.30..66068.40 rows=1 width=45) (actual time=51785.317..51785.446 rows=0 loops=3)
-- Recheck Cond: (command_id = 83)
-- Filter: (((command_parameters ->> 'job_id'::text) ~~* '3432a908-f7b8-4e36-89fc-88f3310b84d5'::text) AND ((command_parameters ->> '
-- payload'::text) ~~* '{"success": true, "error": "", "artifacts": {"alpha_diversity": {"artifact_type": "alpha_vector", "filepaths": [["/qmounts/qiita_test_data/tes
-- tlocal/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity/alpha-diversity.tsv", "plain_text"], ["/qmounts/qiita_test_data/testloca
-- l/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity.qza", "qza"]], "archive": {}}}}'::text))
-- Rows Removed by Filter: 97315
-- Heap Blocks: exact=20133
-- -> Bitmap Index Scan on idx_processing_job_command_id (cost=0.00..3237.30 rows=294517 width=0) (actual time=41.569..41.569 rows=
-- 293054 loops=1)
-- Index Cond: (command_id = 83)
-- -> Seq Scan on processing_job_status (cost=0.00..1.09 rows=4 width=40) (actual time=0.035..0.035 rows=2 loops=1)
-- Filter: ((processing_job_status)::text = ANY ('{success,waiting,running,in_construction}'::text[]))
-- Rows Removed by Filter: 1
-- -> Bitmap Heap Scan on artifact_output_processing_job aopj (cost=4.43..12.14 rows=2 width=24) (actual time=0.031..0.031 rows=0 loops=1)
-- Recheck Cond: (processing_job.processing_job_id = processing_job_id)
-- -> Bitmap Index Scan on idx_artifact_output_processing_job_job (cost=0.00..4.43 rows=2 width=0) (actual time=0.026..0.026 rows=0 loops=1)
-- Index Cond: (processing_job_id = processing_job.processing_job_id)
-- Planning Time: 1.173 ms
-- Execution Time: 51862.756 ms

-- Note: for this to work you need to have created as admin the extension
-- CREATE EXTENSION pg_trgm;

-- This alter table will take close to 11 min
ALTER TABLE qiita.processing_job
ALTER COLUMN command_parameters TYPE JSONB USING command_parameters::jsonb;

-- This indexing will take like 5 min
CREATE INDEX processing_job_command_parameters_job_id ON qiita.processing_job
USING GIN((command_parameters->>'job_id') gin_trgm_ops);

-- This indexing will take like an hour
CREATE INDEX processing_job_command_parameters_payload ON qiita.processing_job
USING GIN((command_parameters->>'payload') gin_trgm_ops);

-- After the changes
-- 18710.404 ms
Loading