Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Middleware feature #1292

Open
medihack opened this issue Jan 18, 2025 · 15 comments · May be fixed by #1317
Open

Middleware feature #1292

medihack opened this issue Jan 18, 2025 · 15 comments · May be fixed by #1317

Comments

@medihack
Copy link
Member

medihack commented Jan 18, 2025

In #1262 and #1289 we mentioned that a middleware feature would be helpful in some scenarios. In the middleware branch, I have already started to implement it, but I would like to discuss the implementation (@ewjoachim). I use a similar implementation to Django for its middleware. I would like to discuss two API alternatives.

  1. The way I implemented it currently:
ProcessTask: TypeAlias = Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
    [ProcessTask],
    Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]],
]

def simple_middleware(process_task: ProcessTask):
    async def middleware(task: tasks.Task, context: job_context.JobContext):
        # Do something before the task is processed
        result = await process_task(task, context)
        # Do something after the task is processed
        return result
    return middleware
  1. An alternative (maybe better) way:
ProcessJob: TypeAlias = Callable[[job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
    [ProcessJob],
    Callable[[job_context.JobContext], Awaitable[Any]],
]

def simple_middleware(process_job: ProcessTask):
    async def middleware(context: job_context.JobContext):
        # Do something before the job is processed
        result = await process_job(context)
        # Do something after the job is processed
        return result
    return middleware

After some thought, 2) is better because process_job also wraps the part where the job status is saved back to the db. This is especially important if we allow the user to raise a StopWorker exception (#1262 (comment)).
One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set. So when the user adds some pause in the middleware (e.g. for #1289) before the job is processed, this pause will be included in the overall job duration. I'm not sure if this is what we want or not.

@ewjoachim
Copy link
Member

One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set

Aha, you assume that the process_job function of the lowest middleware is the real process_job, but we could have our own "last middleware", that sets this right before the task actually starts.

There's a 3rd alternative (not saying it's better, but I think it's simpler):

async def middleware(process_job: ProcessTask, context: job_context.JobContext):
    # Do something before the job is processed
  	result = await process_job(context)
    # Do something after the job is processed
	return result

(your implementation highlight the fact that we will always have the same process_job while it's less visible in mine.

The 3 implementation put the focus on the idea that the middleware could modify the job_context (well, it's immutable, but it could send a different instance). I'm still trying to try and think whether it's a good idea.

A 4th alternative could be:

async def middleware(process_job: ..., context: job_context.JobContext):
    # Do something before the job is processed
   result = await process_job()
    # Do something after the job is processed
	return result

(note that not much value would be lost if we didn't bother returning the job's result)

(Or even, 5th:

async def middleware(process_job: Awaitable, context: job_context.JobContext):
    # Do something before the job is processed
   result = await process_job
    # Do something after the job is processed
	return result

but in this case, in the event one would choose not to execute the task, they'd get a Warning for awaitble never awaited)

I think my favorite one might be the 3rd, but I'm open to discussion if you think the 2nd (your preferred?) is better.

@medihack
Copy link
Member Author

One thing that bothers me a bit is that in both scenarios, the context was already created, and thereby a start_timestamp is already set

Aha, you assume that the process_job function of the lowest middleware is the real process_job, but we could have our own "last middleware", that sets this right before the task actually starts.

So, you mean that the context we pass to the middleware is not the same context we internally use to process the task? We re-create the context in our process_job function and then use that one?

There's a 3rd alternative (not saying it's better, but I think it's simpler):

async def middleware(process_job: ProcessTask, context: job_context.JobContext):
# Do something before the job is processed
result = await process_job(context)
# Do something after the job is processed
return result
(your implementation highlight the fact that we will always have the same process_job while it's less visible in mine.

Yes, I have taken the Django middleware as a model (https://docs.djangoproject.com/en/5.1/topics/http/middleware/#writing-your-own-middleware) and thought it would be a good fit as our process_job is, like the get_response function in Django, always the same one. But I don't have a strong opinion about that.

If we recreate the context and don't use the one passed to the middleware, the 4th alternative may be better because it clarifies that the context can't be changed (or better to say it would not affect anything).

@onlyann
Copy link
Contributor

onlyann commented Jan 27, 2025

I am not sure that rate limiting is best solved by middleware.
By the time the middleware runs, the job is already fetched and in a doing status.
One could probably achieve some sort of rate limiting through middleware but I don't think that would be efficient, or pratical for rate limiting across multiple workers.

If all we want with the middleware is to inject logic before/after the task, I would lean towards what @ewjoachim labels option 3.
I would just name the function differently:

async def middleware(next: ProcessTask, context: job_context.JobContext):
        # Do something before the job is processed
  	result = await next(context)
        # Do something after the job is processed
	return result

I would be interested to see a few examples of useful middleware.

@ashleyheath
Copy link
Contributor

I would be interested to see a few examples of useful middleware.

Things we currently do by monkey patching procrastinate that this could be useful for:

  • Wrap task execution in our own structured logging/OpenTelemetry spans/other instrumentation
  • Wrap the task in a common dependency injection container

I could also envision us using middleware to wrap certain tasks in an asyncio timeout.

@medihack
Copy link
Member Author

medihack commented Jan 27, 2025

I am not sure that rate limiting is best solved by middleware.

Yes, trying different implementations, I came to the same conclusion.

I even wonder if a middleware feature is flexible enough or if we should not better implement several callback hooks the user can plug into, e.g.:

  • before_job_fetched
  • after_job_fetched
  • after_job_processed

Or both, a middleware that wraps the job processing and a before_job_fetched callback.

@onlyann
Copy link
Contributor

onlyann commented Jan 27, 2025

My preference for rate limiting is to make it a distinct feature that is implemented at the DB level.

I will try something out and report back so we have something to discuss.

In the meantime, the middleware feature could be limited to chaining functions as part of task processing, which would help with examples shared above.

@medihack
Copy link
Member Author

My preference for rate limiting is to make it a distinct feature that is implemented at the DB level.

Yes, a dedicated rate-limiting feature probably makes the most sense. And then provide a rate limit per task?

@ashleyheath
Copy link
Contributor

In the meantime, the middleware feature could be limited to chaining functions as part of task processing, which would help with examples shared above.

@onlyann my only concern with the chaining approach is that it will make it harder to use python's native context managers, which in my experience is a large part of the usefulness of middleware-style patterns.

@onlyann
Copy link
Contributor

onlyann commented Jan 28, 2025

@onlyann my only concern with the chaining approach is that it will make it harder to use python's native context managers, which in my experience is a large part of the usefulness of middleware-style patterns.

Context managers from contextlib?

Can you please give me an example where it wouldn't work with middleware?

@ashleyheath
Copy link
Contributor

Sorry, I might have misinterpreted the above conversation - it seemed that the discussion had turned to implementing distinct before/after callbacks rather than a single middleware interface (i.e. your original sketch here). If that's still the plan then ignore me 😄

@medihack
Copy link
Member Author

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

@onlyann
Copy link
Contributor

onlyann commented Jan 28, 2025

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

@medihack
Copy link
Member Author

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

What would you suggest? Wrap the ensure_async?

@onlyann
Copy link
Contributor

onlyann commented Jan 28, 2025

After thinking about it even more, I will implement a middleware that wraps the worker._process_job. My hook plans are on hold, at least for now. I think @onlyann is right that a dedicated rate-limiting feature is better than using a hook for that.

If this is what is wrapped, there will be no ability for a middleware to modify the outcome of a task.

What would you suggest? Wrap the ensure_async?

Yes. That would be similar to the hand rolled middleware from the docs.

@medihack
Copy link
Member Author

medihack commented Feb 1, 2025

Yes. That would be similar to the hand rolled middleware from the docs.

The main reason why I wrap _process_job instead of ensure_async is that an exception can be more easily raised to stop the worker. But now I think raising an exception is not the way to go, as the job status depends too much on where that exception is raised (before or after calling the provided process_job function). A better alternative could be to pass the worker instance itself so that worker.stop() can be called from within the middleware. And then we can also wrap the ensure_async instead of process_job.

@medihack medihack linked a pull request Feb 3, 2025 that will close this issue
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants