Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub
As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.
Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.
This package aims to remove some painful tasks by:
- Creating one dedicated topic for each function
- Creating one dedicated reusable subscription for each function
We do not recommand this package for the following cases:
- You need to reuse your development in a multi-cloud context
- You have high volume of messages to process (not tested)
This package is given "as it", without garantees, under the GPLv3 License.
- A Google Cloud account
- A GCP project (here to create a new one), with Pub/Sub API enabled (take care to select the good one)
- A Service Account for which one you need a credential JSON file (
creds.json
in example below), with roles:- Pub/Sub Admin
- A local environment with Python >= 3.9
pip install flask-gcp-pubsub
demo.py
#!/usr/bin/env python
# coding: utf-8
from flask import Flask, make_response
from flask_gcp_pubsub import PubSub
app = Flask(__name__)
pubsub = PubSub(
app,
project_id='<project_id>',
gcp_credentials_file='./creds.json'
)
@pubsub.task
def my_task(msg1, msg2):
"""Awesome delayed execution"""
print('test', msg1, msg2)
return 'ok'
@app.route('/test')
def route_test():
"""Launch delayed execution"""
my_task.delay('test1', 'test2')
return make_response('ok', 200)
WARNING: do not forget to replace <project_id>
with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.
wsgi.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import app
# Start
if __name__ == '__main__':
app.run()
wsgi_delayed.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import pubsub
# Start
if __name__ == '__main__':
pubsub.run()
This command will launch the Flask server:
flask run --port 9090
This command will launch the asynchronous tasks manager:
python wsgi_delayed.py
You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:
Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05
You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
print('FINALIZE', args, kwargs)
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
print('DELETE', args, kwargs)
For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.
You have to add the Pub/Sub Admin role for that particular Service Account in IAM.
The kwargs returns all attributes of the Pub/Sub notification.
If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:
gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>
Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update
).
If both method used for one configuration key, the class instanciation is primary.
Flask env variable | Keyword argument | Usage | How-to get? |
---|---|---|---|
PUBSUB_PROJECT_ID |
project_id |
GCP project ID | See console.cloud.google.com |
PUBSUB_CREDENTIALS_JSON |
gcp_credentials_json |
Service account credentials, as JSON string format | See IAM in console.cloud.google.com |
PUBSUB_CREDENTIALS_FILE |
gcp_credentials_file |
Servicce account credentials, as JSON local file | See IAM in console.cloud.google.com |
PUBSUB_CONCURRENT_CONSUMERS |
concurrent_consumers |
Number of simultaneous consumer (default: 4 ) |
|
PUBSUB_CONCURRENT_MESSAGES |
concurrent_messages |
Number of messages pull from topic per consumer per call (default: 2 ) |
|
PUBSUB_TOPIC_PREFIX |
topic_prefix |
Prefix for all topic used in the instance, useful for feature branches using same project. | |
PUBSUB_AUTO_SETUP |
auto_setup |
Enable the auto-setup for Topics creation and Bucket notifications to Pub/Sub (default: false ) |
|
PUBSUB_DEADLINE |
deadline |
Set deadline retry for all Pub/Sub operations (default: 300 ) |
|
PUBSUB_PULL_RETURN_IMMEDIATELY |
return_immediately |
Enable return immediately flag for pulling (default: false ) |
- Priority in the treatment of messages per functions
- Logging instead of print (+ option to format as JSON)
- Contributing manual
- Documentation about Flask configuration keys and their counterpart on PubSub direct call
- Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment