2015-05-16T02:09:49Z

Celery and the Flask Application Factory Pattern

After I published my article on using Celery with Flask, several readers asked how this integration can be done when using a large Flask application organized around the application factory pattern. It's a very good question, as it is non-trivial to make Celery, which does not have a dedicated Flask extension, delay access to the application until the factory function is invoked.

In this article I'm going to describe in detail how I added Celery to Flasky, the application featured in my Flask book.

The Code

I know some of you are impatient, so let me direct you to the Github repository that has the modified Flasky application described in this article: http://github.com/miguelgrinberg/flasky-with-celery.

The first two commits in this repository import the Flasky application, as featured in my book. The changes to add Celery are all contained in the third and last commit.

Creating the Celery instance

The first problem that presents is how to create the celery object, which provides the celery.task decorator. The fact that it provides the decorator means that it has to be created as a global variable, and that implies that the Flask application instance is not going to be around when it is created.

Here is how I initialized the Celery instance in the single file application:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

So this is a big problem, as I'm using app all over the place here. To adapt this bit of code to Flasky I had to get a bit creative. Here is what I did:

from celery import Celery
from config import config, Config

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_name):
    # ...
    celery.conf.update(app.config)
    # ...
    return app

The solution involves separating the creation and the configuration of the celery instance. I create the object as a global variable, but I delay its configuration until create_app() is invoked.

To make this work, I had to remove all references to app in the object creation. Instead of app.name I used __name__, which is what app.name will be initialized to later when the app factory function is called. The only configuration item that needs to be passed during creation is the URL of the broker, so to get that item before the application exists I had to import it directly from the Config class. The one problem that this creates is that it is not possible to have different brokers in different configurations, this item is fixed for all configurations.

The configuration portion is very easy. In the application factory function the application is available, so configuration works exactly as in the single file case.

Sending Asynchronous Emails Through Celery

To test this setup I converted the thread based email sending function to use Celery. This was surprisingly easy to do. Here is the relevant code:

from . import celery

@celery.task
def send_async_email(msg):
    mail.send(msg)

def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    send_async_email.delay(msg)

Here I simply decorate the function that sends the email with celery.task, and then invoke it using the delay() method. In the thread based version the main thread passed the app variable to the background thread so that it can set up an application context (required by Flask-Mail), but I have removed that here because passing an application instance to the Celery worker process doesn't make much sense. Instead I want the worker to have its own Flask application, like I did in the single file example.

Setting Up The Celery Worker

The only remaining task is to launch a Celery worker. This process needs to have its own Flask application instance that can be used to create the context necessary for the Flask background tasks to run. For this I used a separate starter script, which I called celery_worker.py:

#!/usr/bin/env python
import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()

This little script creates a Flask application and pushes an application context, which will remain set through the entire life of the process. Celery also needs access to the celery instance, so I imported it from the app package.

If you have an activated virtual environment, now you can start the Celery worker with the following command:

(venv) $ celery -A celery_worker.celery worker --loglevel=info

If you now start a Redis service and the Flasky application, everything should be working.

Conclusion

I hope this clarifies the setup of Celery, but if there are any remaining questions feel free to let me know below in the comments. If you want step-by-step instructions on how to run the example code, see the README file on the github repository.

Miguel

74 comments

  • #51 Jeff Thorne said 2017-04-04T12:31:34Z

    Thanks for the article and great book Miguel. Very helpful. I have all of this working from the command line and am now trying to daemonize celery with an upstart script. I do use sqlalchemy in my celery task and keep getting the following error. OperationalError('(psycopg2.OperationalError) could not connect to server. Is there anything I would need to add to celery_worker?

    Cheers, Jeff

  • #52 Miguel Grinberg said 2017-04-04T18:16:45Z

    @Jeff: How do you pass the database connection URL to your Celery workers?

  • #53 WTRipper said 2017-09-18T12:43:11Z

    Hello Miguel, thank you for this nice tutorial! how would one use the flask logger inside a celery task?

  • #54 Miguel Grinberg said 2017-09-18T21:41:32Z

    @WTRipper: The app.logger object should be available in the Celery worker. If you have an application context set up for your task, then use "current_app.logger" to access it.

  • #55 Dmitry Moroz said 2018-03-06T16:32:02Z

    How it can work?

    EncodeError: is not JSON serializable

  • #56 Miguel Grinberg said 2018-03-06T17:27:04Z

    @Dmitry: Newer versions of Celery that came after this article was published use JSON as a default serialization mechanism. Back when I did this, Celery used pickle by default. You have to either downgrade Celery to a version 3 (the one that uses pickle), or if you want to continue using version 4, you have to configure it to use pickle instead of JSON.

  • #57 Igor said 2018-03-13T13:59:06Z

    Hello Miguel, thanks for another great article!

    How would you reccomend running celery on a production server? At the momment Im doing it this way

    celery worker -A celery_worker.celery -B -f celery.log --loglevel=info --detach

    Is there something wrong with doing it this way? I saw in the official docs that they are recommending demonizing it, but I could not get it start with the method they proposed. This method however works. But Im wondering if there are any hidden/potential gotchas with this method? Thanks

  • #58 Miguel Grinberg said 2018-03-13T18:53:07Z

    @Igor: The "running as a daemon" recommendation does not change how you start the process. The point of daemonizing it is that should the worker processes die, they will be restarted. You can continue to start the workers in the same way, but adding a systemd or supervisord script will ensure that these processes are always running.

  • #59 Ravi said 2018-04-10T10:36:37Z

    Hi Miguel,

    I have a Flask application that uses the Gunicorn webserver and ngnix as fabric proxy. I use the command below to run the webserver. - exec gunicorn --timeout 300 -c "$ROOT"/deploy/gunicorn.conf.py deploy:app "$@" I use gunicorn.conf.py to specify the settings like - worker_class - workers etc

    I need to send some background tasks to the Celery from my Flask applications. For that, I need to create Celery instance integrated with the Flask app_context as you mentioned above. Now, in order to run Celery worker, I will use the command - celery worker -A celery_worker.celery --loglevel=info This command does not incorporate my Gunicorn settings. Also, is there any way where I can run my Flask application and Celery worker as different service and still be able to send the background jobs.

  • #60 Miguel Grinberg said 2018-04-10T21:55:32Z

    @Ravi: I don't quite follow what you are asking. I suggest you give the code presented in this article a try. I think that will help you visualize how a Flask + Celery solution can be implemented.

  • #61 Cabe said 2019-07-05T21:49:36Z

    I'm using Celery 4.3 and RabbitMQ. I keep getting the following error message after the celery worker receives the task:

    [2019-07-05 16:28:56,669: INFO/MainProcess] Received task: app.email.send_async_email[a0026ce1-7090-4eec-95ba-498109ebc6b1] [2019-07-05 16:28:56,689: WARNING/ForkPoolWorker-2] send: [2019-07-05 16:28:56,689: WARNING/ForkPoolWorker-2] 'ehlo 132.11.168.192.in-addr.arpa\r\n' [2019-07-05 16:28:56,695: ERROR/ForkPoolWorker-2] Task app.email.send_async_email[a0026ce1-7090-4eec-95ba-498109ebc6b1] raised unexpected: SMTPServerDisconnected('please run connect() first')

    Here's my email.py:

    from flask import render_template from flask_mail import Message from . import celery from . import mail from config import Config

    @celery.task def send_async_email(to, subject, email_body, email_html): msg = Message(Config.MAIL_SUBJECT_PREFIX + subject, sender=Config.MAIL_SENDER, recipients=[to]) msg.body = email_body msg.html = email_html mail.send(msg)

    def send_email(to, subject, template, kwargs): email_body = render_template(template + '.txt', kwargs) email_html = render_template(template +'.html', **kwargs) send_async_email.delay(to, subject, email_body, email_html)

    Any idea what the issue could be? The app sends emails just fine without Celery, but when I add the .delay I am getting this every time.

  • #62 Miguel Grinberg said 2019-07-06T13:40:40Z

    @Cabe: I can't be sure, but my guess is that your Flask app context isn't properly configured, so the settings of your email server aren't correct in the Celery worker.

  • #63 Matthias said 2019-11-16T15:49:56Z

    Hi Miguel, first of all, thank you so much for this articel. it helps me al lot. I have one problem. I init in the create_app methode an MQTT client. The problem now is, that when I start my Flask app two MQTT clients are started, because the celery_worker starts its own MQTT Client because it uses the create_app methode. How can I solve this?

    def create_app(config): app = Flask(name) app.config.from_object(config.DevelopmentConfig)

    mqtt.init_app(app) db.init_app(app) api.init_app(app) ma.init_app(app) init_celery(celery, app) thread1 = MqttConsumer() thread1.daemon = True thread1.start() return app
  • #64 Miguel Grinberg said 2019-11-16T19:23:41Z

    @Matthias: add an argument to your create_app that determines if mqtt is initialized or not. For example:

    def create_app(mqtt=False): # ... if mqtt: thread1 = MqttConsumer() # ...

    Then you should only pass mqtt=True from the main app.

  • #65 Matthew said 2021-04-09T03:29:54Z

    This (or at least the command to run celery) seems to be out of date as of Celery 5.0

    When running: celery worker -A celery_worker.celery --loglevel=info

    I receive the warning and error:

    You are using `-A` as an option of the worker sub-command:
    celery worker -A celeryapp <...>
    
    The support for this usage was removed in Celery 5.0. Instead you should use `-A` as a global option:
    celery -A celeryapp worker <...>
    Usage: celery worker [OPTIONS]
    Try 'celery worker --help' for help.
    
    Error: no such option: -A
    
  • #66 Miguel Grinberg said 2021-04-09T23:12:36Z

    @Matthew: thanks for letting me know. Updated the blog post.

  • #67 Francisco Pires Costa said 2021-04-25T04:41:41Z

    Really interesting guide here. So many thanks for this. The question is that i have tried to use this tutorial, and somehow inside the celery task it does not have access to flask application context, even using: with current_app.app_context():

    mail.send(msg) returns RuntimeError working outside of application context.

    Basically by reading many forums it seems that it is recurring.

    In the meanwhile i found your github of flasky where you discuss the only solution so far that works after checking many available ideas in several sources: https://github.com/miguelgrinberg/flasky-with-celery/issues/12

    It consists in injecting flask context using a new instance in each task. So in tasks.py, declare function like here:

     def inject_flaskcontext():
        flask_app = create_app(CurrentConfig)
        return flask_app.app_context().push()
    

    And in each celery task add:

    @celery_app.task
    def dosomething(args):
            inject_flaskcontext()    # <---- here
            # do something using SQLAlchemy, Flask-Mail, Flask contexts, and so on.
    

    Looking at the definition of application dispatching this looks similar https://flask.palletsprojects.com/en/1.1.x/patterns/appdispatch/#application-dispatching

    Since each task is suppose to be running in a separate encapsulated environment, the fact that it is creating a new Flask app could be ok. Although i would like to ask you if this is scalable and what could be the implication of hundreds of users, generating hundreds of different task instances like this. Furthermore, the generated applications only would be around while each task would be actually running (not in queue), and would be clean once the task is completed. So theoretically this would minimize even more if any issue could arise. So far is the only reasonable and understandable solution i found, So without further reasonable information really sounds this is the solution. Cheers.

  • #68 Miguel Grinberg said 2021-04-25T16:00:15Z

    @Francisco: you adapted the solution from that issue and made a mistake on the way. You are pushing an app context but never popping it when you are done. That is not a good idea, the application context stack will continue to grow and grow if you don't remove your contexts when you are done using them.

    Besides that, I suggest you create a global app instance, and only make the app context local to the task. That will be more optimal.

  • #69 Francisco Pires Costa said 2021-04-26T10:10:11Z

    Howdy, using new_app=create_app(config) in top of tasks.py and then bringing new_app into celery task is not working here, celery (5.0.5) flask (1.1.2). Also there is the case of importing which also seems not working, besides adding circular imports.

    Only way that it is working is to actually create_app and push context like explained here: https://github.com/miguelgrinberg/flasky-with-celery/issues/12

    So it would be good if the context could be removed when the Celery Task is done, maybe something like db.session.close() or db.session.remove() or delete the application, or refresh, if you have any recommendation for handling this context would be welcome in advance.

  • #70 Francisco Pires Costa said 2021-04-26T10:22:28Z

    Having also tried to replace app_context.push() by with app.app_context(), and it also works (although from the documentation seems that doing with app.app_context() is the same as doing app_context.push(), if whole block is in with app.app_context().

    If something is improved in the case 'with app.app_context()' being less intrusive, for extra lines of code can be done the following inside celery task:

    a = create_app(CurrentConfig)
    with a.app_context():
         # do stuff including calling outer functions
         # using Flask-SQL Alchemy and Flask-Mail, even Flask-Babel using get_locale from database
         db.session.close()
         del a
    

    Feedback is welcome in advance.

  • #71 Miguel Grinberg said 2021-04-27T22:18:16Z

    @Francisco: I think you are over-complicating this. The only thing that I mentioned is that you were pushing contexts into the stack and never popping them. That's the problem. There are two solutions for it.

    1) pop the context when you are done:

    app = create_app(CurrentConfig)
    ctx = app.app_context()
    ctx,push()
    
    # do stuff here
    
    db.session.close()
    ctx.pop()
    

    2) Use a context manager:

    a = create_app(CurrentConfig)
    with a.app_context():
         # do stuff here
         db.session.close()
    
  • #72 Francisco Pires Costa said 2021-04-28T12:12:57Z

    Very nice! Ok thanks!

  • #73 Apoorva said 2021-05-26T16:31:37Z

    I am getting an error( Object of type Message is not JSON serializable)

  • #74 Miguel Grinberg said 2021-05-26T19:20:23Z

    @Apoorva: you are sending data that is incompatible with JSON. It was already mentioned several times in the comments, this article was written with the Pickle format, which was the default at the time. Now Celery uses JSON as default, so you can only send primitive types to the task. Instead of passing the Message object, you have to pass its attributes, and then construct the Message object directly in the task.

Leave a Comment