After I published my article on using Celery with Flask, several readers asked how this integration can be done when using a large Flask application organized around the application factory pattern. It's a very good question, as it is non-trivial to make Celery, which does not have a dedicated Flask extension, delay access to the application until the factory function is invoked.
I know some of you are impatient, so let me direct you to the Github repository that has the modified Flasky application described in this article: http://github.com/miguelgrinberg/flasky-with-celery.
The first two commits in this repository import the Flasky application, as featured in my book. The changes to add Celery are all contained in the third and last commit.
Creating the Celery instance
The first problem that presents is how to create the
celery object, which provides the
celery.task decorator. The fact that it provides the decorator means that it has to be created as a global variable, and that implies that the Flask application instance is not going to be around when it is created.
Here is how I initialized the Celery instance in the single file application:
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config)
So this is a big problem, as I'm using
app all over the place here. To adapt this bit of code to Flasky I had to get a bit creative. Here is what I did:
from celery import Celery from config import config, Config celery = Celery(__name__, broker=Config.CELERY_BROKER_URL) def create_app(config_name): # ... celery.conf.update(app.config) # ... return app
The solution involves separating the creation and the configuration of the
celery instance. I create the object as a global variable, but I delay its configuration until
create_app() is invoked.
To make this work, I had to remove all references to
app in the object creation. Instead of
app.name I used
__name__, which is what
app.name will be initialized to later when the app factory function is called. The only configuration item that needs to be passed during creation is the URL of the broker, so to get that item before the application exists I had to import it directly from the
Config class. The one problem that this creates is that it is not possible to have different brokers in different configurations, this item is fixed for all configurations.
The configuration portion is very easy. In the application factory function the application is available, so configuration works exactly as in the single file case.
Sending Asynchronous Emails Through Celery
To test this setup I converted the thread based email sending function to use Celery. This was surprisingly easy to do. Here is the relevant code:
from . import celery @celery.task def send_async_email(msg): mail.send(msg) def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) send_async_email.delay(msg)
Here I simply decorate the function that sends the email with
celery.task, and then invoke it using the
delay() method. In the thread based version the main thread passed the
app variable to the background thread so that it can set up an application context (required by Flask-Mail), but I have removed that here because passing an application instance to the Celery worker process doesn't make much sense. Instead I want the worker to have its own Flask application, like I did in the single file example.
Setting Up The Celery Worker
The only remaining task is to launch a Celery worker. This process needs to have its own Flask application instance that can be used to create the context necessary for the Flask background tasks to run. For this I used a separate starter script, which I called
#!/usr/bin/env python import os from app import celery, create_app app = create_app(os.getenv('FLASK_CONFIG') or 'default') app.app_context().push()
This little script creates a Flask application and pushes an application context, which will remain set through the entire life of the process. Celery also needs access to the
celery instance, so I imported it from the
If you have an activated virtual environment, now you can start the Celery worker with the following command:
(venv) $ celery worker -A celery_worker.celery --loglevel=info
If you now start a Redis service and the Flasky application, everything should be working.
I hope this clarifies the setup of Celery, but if there are any remaining questions feel free to let me know below in the comments. If you want step-by-step instructions on how to run the example code, see the README file on the github repository.