Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for job type filtering on workers and pool workers #292

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

kydemy-fran
Copy link

I have added an option to Workers and PoolWorkers so they can filter/poll only jobs of specific job types.
This way you can have several pools or services polling from the same queue, and not having to implement all job types or define a handler for unknown types. (specially if a new type is added later on)

Also, with this implementation workers will not lock unsupported jobs, blocking other workers that can process those jobs/rows.

There is more info in this issue: #291

I've created some functions to generate the final queries using constants, as the queries are not static anymore.

PS: If this gets merged, README/examples/documentation might need to be updated. Although the signatures are not changed and is fully compatible with the previous version.


return c.execLockJob(ctx, true, sql, queue, time.Now().UTC())
func (c *Client) LockJob(ctx context.Context, queue string, jobTypes ...string) (*Job, error) {
sql, args := newLockJobQuery(queue, jobTypes)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice from the DRY point of view, but makes code readability way worse

I prefer having full queries here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that now the job_type condition is not fixed as the previous conditions where before.
So to keep it there we need:

  • Define the first part of the SQL
  • Define the base Where clause
  • Define the base args
  • If there are job types, append to the where clause the condition and append to args the array
  • Add the end of the query
  • Call the exec
    As you can see it gets messy and a ton of duplicated lines among functions.

Could you suggest another approach that would not make those functions messy?

PS: I've been always trying to separate as much as possible logic from SQL, as you separate HTML/CSS from logic.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you suggest another approach that would not make those functions messy?

sql := `SELECT ... `
if len(jobTypes) > 0 {
  sql = `SELECT ...`
}

and tests that will cover branching

@@ -12,4 +12,4 @@ CREATE TABLE IF NOT EXISTS gue_jobs
updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, run_at, priority);
CREATE INDEX IF NOT EXISTS idx_gue_jobs_selector ON gue_jobs (queue, job_type, run_at, priority);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is backward-compatibility breaking change actually

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a breaking change? if the index exists it will ignore the line.
Do you want me to add an additional index for this?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because w/out this change new functionality may cause performance issues, and given that gue is just a library - the app using it may start experiencing serious issues, e.g. running out of available DB connections because gue worker queries are taking too long

previously all DB changes were part of the major version update

I'm fine to let this index change in the minor version update, but then you need to prepare a migration that can be mentioned in the change notes

@@ -31,6 +31,13 @@ func WithWorkerQueue(queue string) WorkerOption {
}
}

// WithWorkerJobTypes limits/filters the job types this worker will fetch from the DB.
func WithWorkerJobTypes(jobTypes []string) WorkerOption {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be variadic to keep the options consistent

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants