Celery and the Flask Application Factory Pattern

Posted by
on under

After I published my article on using Celery with Flask, several readers asked how this integration can be done when using a large Flask application organized around the application factory pattern. It's a very good question, as it is non-trivial to make Celery, which does not have a dedicated Flask extension, delay access to the application until the factory function is invoked.

In this article I'm going to describe in detail how I added Celery to Flasky, the application featured in my Flask book.

The Code

I know some of you are impatient, so let me direct you to the Github repository that has the modified Flasky application described in this article: http://github.com/miguelgrinberg/flasky-with-celery.

The first two commits in this repository import the Flasky application, as featured in my book. The changes to add Celery are all contained in the third and last commit.

Creating the Celery instance

The first problem that presents is how to create the celery object, which provides the celery.task decorator. The fact that it provides the decorator means that it has to be created as a global variable, and that implies that the Flask application instance is not going to be around when it is created.

Here is how I initialized the Celery instance in the single file application:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

So this is a big problem, as I'm using app all over the place here. To adapt this bit of code to Flasky I had to get a bit creative. Here is what I did:

from celery import Celery
from config import config, Config

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_name):
    # ...
    celery.conf.update(app.config)
    # ...
    return app

The solution involves separating the creation and the configuration of the celery instance. I create the object as a global variable, but I delay its configuration until create_app() is invoked.

To make this work, I had to remove all references to app in the object creation. Instead of app.name I used __name__, which is what app.name will be initialized to later when the app factory function is called. The only configuration item that needs to be passed during creation is the URL of the broker, so to get that item before the application exists I had to import it directly from the Config class. The one problem that this creates is that it is not possible to have different brokers in different configurations, this item is fixed for all configurations.

The configuration portion is very easy. In the application factory function the application is available, so configuration works exactly as in the single file case.

Sending Asynchronous Emails Through Celery

To test this setup I converted the thread based email sending function to use Celery. This was surprisingly easy to do. Here is the relevant code:

from . import celery

@celery.task
def send_async_email(msg):
    mail.send(msg)

def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    send_async_email.delay(msg)

Here I simply decorate the function that sends the email with celery.task, and then invoke it using the delay() method. In the thread based version the main thread passed the app variable to the background thread so that it can set up an application context (required by Flask-Mail), but I have removed that here because passing an application instance to the Celery worker process doesn't make much sense. Instead I want the worker to have its own Flask application, like I did in the single file example.

Setting Up The Celery Worker

The only remaining task is to launch a Celery worker. This process needs to have its own Flask application instance that can be used to create the context necessary for the Flask background tasks to run. For this I used a separate starter script, which I called celery_worker.py:

#!/usr/bin/env python
import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()

This little script creates a Flask application and pushes an application context, which will remain set through the entire life of the process. Celery also needs access to the celery instance, so I imported it from the app package.

If you have an activated virtual environment, now you can start the Celery worker with the following command:

(venv) $ celery -A celery_worker.celery worker --loglevel=info

If you now start a Redis service and the Flasky application, everything should be working.

Conclusion

I hope this clarifies the setup of Celery, but if there are any remaining questions feel free to let me know below in the comments. If you want step-by-step instructions on how to run the example code, see the README file on the github repository.

Miguel

Become a Patron!

Hello, and thank you for visiting my blog! If you enjoyed this article, please consider supporting my work on this blog on Patreon!

78 comments
  • #1 Rahul said

    I get the following error whenever I try to start the Celery worker

    -------------- celery@qasim v3.1.18 (Cipater)
    ---- * -----
    --- *
    * -- Linux-3.16.0-30-generic-x86_64-with-Ubuntu-14.04-trusty
    -- * - * ---
    -
    * ---------- [config]
    -
    ---------- .> app: app:0x7f0a461c5bd0
    - ---------- .> transport: redis://localhost:6379/0
    -
    ---------- .> results: redis://localhost:6379/0
    - --- * --- .> concurrency: 8 (prefork)
    --
    * ----
    ---
    ** ----- [queues]
    -------------- .> celery exchange=celery(direct) key=celery

    [tasks]
    . app.email.send_async_email

    [2015-05-20 17:09:41,091: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0: Error 111 connecting to localhost:6379. Connection refused..
    Trying again in 2.00 seconds...

    and If I ignore it and go and try to register a new user I get ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

    I am using the Flasky app while doing your book.

  • #2 Miguel Grinberg said

    @Rahul: did you install Redis? That's required for the application to communicate with the Celery worker.

  • #3 Patricio said

    Hi Miguel !
    I have a short deadline to develop a web application to perform civil data registration and document emission, coupled with external biometric systems and some local quality control steps.
    On the app will reside some processes which should be defined by some workflow layer.
    On the workers or tasks I should be able to have really different stuff like operator input (enter data and capture biometrics), long unmanaged operations (like the biometric identification performed on external system which should return in some way the id of the record) and local specific stuff, like quality controls, printing of documentation, etc...
    I've been reading on the internet, and would hopefully use a combination of Flask, Celery/RabbitMQ and some Workflow basic lib. However I don't have any experience with these technologies and would like to know if you see something blocking, or have some recommendations or libs ?
    I have read that Celery via Canvas can implement simple workflow task sequences with basic control ?
    I also dont know if a Celery task could be a user input or sequence of input and validations ?
    Thank you a lot for any clues !

  • #4 Miguel Grinberg said

    @Patricio: Celery tasks are usually things that can be done unattended in the background, not tasks that require user input. Your main application should talk to users.

  • #5 Julien said

    Hi Miguel !

    I have a question regarding celery and global variables. How can you access one from the celery app ? I'm trying to pass a global variable Queue() to the task. Inside the task it seems to work as the Queue() grows, but when I try to access the same variable from outside the Queue() to consume it I always find that the Queue() is empty.

    Do you have any thoughts on that problem ? Cheers !

  • #6 Miguel Grinberg said

    @Julien: the Celery workers live in their own process, so you can't share global variables with your Flask application. You can pass values back to the application as return values, or set up a shared storage that works across processes, maybe something like Redis or a SQL database might work.

  • #7 Patricio said

    @Miguel: given the fact that a user input is not to be seen as a celery task, then using Canvas to create a kind of workflow wouldnt work ? Will I need a library that sequences the successive tasks without using Canvas ?
    Thank you again !

  • #8 Miguel Grinberg said

    @Patricio: unfortunately I don't understand what you need to do, so I can't answer your question. Sorry.

  • #9 Elmer said

    I have this error on running:
    return self.__local()
    File "C:\Python27\lib\site-packages\flask\globals.py", line 34, in _find_app
    raise RuntimeError('working outside of application context')
    How solve this?
    Thanks

  • #10 Miguel Grinberg said

    @Elmer: look at my previous Celery article. The email task is also one that needs an application context, you can use the same solution for your problem.

  • #11 Mirek said

    @Miguel: I have exactly the same problem as @Elmer. I looked at the previous Celery article, however, I didn't find any solution. :/ The problem is that I don't know, how to get to the current_app or application context in send_async_email function. Everything I use leads to 'working of outside of application context'. (e.g. print current_app)

    How could that be solved, please? Do you have any working code?

    I even tried running your code from github with the same result.

    Thank you for suggestions.

  • #12 Miguel Grinberg said

    @Mirek: I think you must be doing something wrong when you use my example and get an error. As I mentioned in previous comments, the Celery worker needs an application instance created. This is obviously not the same application instance from the main process, but it is configured the same, so any extensions that need to look up configuration variables or need an app context for other reasons can use the auxiliary instance.

    If you want me to help diagnose your error, please post the stack trace that you get when using my code example.

  • #13 vic said

    Hello Miguel! First of all I would like to thank you for awesome articles!
    In my smart home prj I use Raspberry. For web server I use Flask. For background operations I have create single thread (where is being read temperature periodically from temperature sensors and save data to DB. Etc. for other devices)
    1. What is the best way to do it? I think use periodic task from celery.
    2. If it is a celery periodic task: How I can send data from app to this task?
    3. If it is a celery periodic task: Can I use session SQLAlchemy from the app in celery task? Maybe I have create separate sessions for app and task thread?
    4. How does configurate (maybe SQLAlchemy) to call callback function in thread side when was changed on app side
    Thanks!

  • #14 Miguel Grinberg said

    @vic: if you need just a single background job for the entire application, then Celery might be overkill, you can just start a thread in your Flask process instead. To access the database from a thread or process you will need to create application and database instances (if you are using a separate process for your background job), and then inside an application context you can work with the database like you do it during a request.

  • #15 Halcyon said

    Miguel how come I get this error? attempted to generate s url without the application context being pushed.

    this is my worker:

    def generate_token_and_send_mail(email, confirm_route=None, confirm_template=None):
    token = generate_confirmation_token(email)
    confirm_url = url_for(confirm_route, token=token, _external=True)
    html = render_template(confirm_template, confirm_url=confirm_url)
    subject = "Please confirm your email"
    send_email(email, subject, html)

    isn't the celery_workey.py pushing the context?becuase my celery_worker.py script is just like yours

  • #16 Miguel Grinberg said

    @Halcyon: Hard to know without seeing the actual code. Sounds like you need to debug and figure out why the context wasn't pushed.

  • #17 Alejo said

    Hi Miguel,

    What's the purpose of the separate starter script for celery? Why can't I start the worker with something like 'celery worker -A app.celery --loglevel=info'?

  • #18 Miguel Grinberg said

    @Alejo: that's exactly what I'm doing. There is no starter script, the script that I use goes after the -A argument and creates an app instance and sets its context.

  • #19 Marcel Hekking said

    Nice article! Thanx. I implemented your code and everyting works fine. However, as soon as I start working with a database connection I get an error ' raise RuntimeError('application not registered on db '
    RuntimeError: application not registered on db instance and no application bound to current context'. Shoud I create seperate database connections for the celery app that is created in 'celery_worker.py'?

  • #20 Miguel Grinberg said

    @Marcel: If the celery worker needs to access the database, then you need to initialize your db instance in the same way you do in your main application. That means you will need to push an application context, so that Flask-SQLAlchemy can initialize. This is done in the celery_worker.py file, you can see how I did it in this article.

  • #21 gledi said

    @Miguel

    I tried this configuration and added a task to send an email via Flask-Mail.

    I ran my app on CentOS 7 and Windows 10 and on Linux everything worked just fine while on Windows I received a RuntimeError: working outside of application context. I did not use cygwin, so I am not sure if it works under cygwin, but on Windows using both Python 2.7.11 and 3.5.1 (x64) downloaded from python.org this did not work.

    I also cloned the flasky-with-celery repo and I got the same outcome, so I am positive that the error did not just happen because I failed to reproduce your setup correctly.

    I am telling you this in case you receive any other complaints from people who can't get this configuration to work and they are using windows.

    Do you think this is something the Flask guys might be interested to know?

    PS: Thanks for the awesome tutorials, articles, extensions, book, ...

  • #22 Miguel Grinberg said

    @gledi: Something must have changed in recent versions of Celery. I will look into it, but in any case, the solution is to create an app instance and push its context inside the send_async_email function.

  • #23 Fuxin said

    Hi Miguel!
    I got the same error on Windows as gledi.
    It worked fine on Linux.
    Is app.app_context().push() not working on Windows?

    And thanks for your tutorials :)

  • #24 Miguel Grinberg said

    @Fuxin: As I mention above, it appears recent versions of Celery have changed something. If you move the pushing of the app context to the send_async_email function everything will work.

  • #25 Nikolay Golub said

    Miguel, thank you for a good article!

    Am I right, that it's impossible to create Celery instance without broker and set this option later?

    I'm trying to find a way to setup celery tasks without circular import and without hacky import calls inside a functions.

Leave a Comment