2015-05-16T02:09:49Z

Celery and the Flask Application Factory Pattern

After I published my article on using Celery with Flask, several readers asked how this integration can be done when using a large Flask application organized around the application factory pattern. It's a very good question, as it is non-trivial to make Celery, which does not have a dedicated Flask extension, delay access to the application until the factory function is invoked.

In this article I'm going to describe in detail how I added Celery to Flasky, the application featured in my Flask book.

The Code

I know some of you are impatient, so let me direct you to the Github repository that has the modified Flasky application described in this article: http://github.com/miguelgrinberg/flasky-with-celery.

The first two commits in this repository import the Flasky application, as featured in my book. The changes to add Celery are all contained in the third and last commit.

Creating the Celery instance

The first problem that presents is how to create the celery object, which provides the celery.task decorator. The fact that it provides the decorator means that it has to be created as a global variable, and that implies that the Flask application instance is not going to be around when it is created.

Here is how I initialized the Celery instance in the single file application:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

So this is a big problem, as I'm using app all over the place here. To adapt this bit of code to Flasky I had to get a bit creative. Here is what I did:

from celery import Celery
from config import config, Config

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_name):
    # ...
    celery.conf.update(app.config)
    # ...
    return app

The solution involves separating the creation and the configuration of the celery instance. I create the object as a global variable, but I delay its configuration until create_app() is invoked.

To make this work, I had to remove all references to app in the object creation. Instead of app.name I used __name__, which is what app.name will be initialized to later when the app factory function is called. The only configuration item that needs to be passed during creation is the URL of the broker, so to get that item before the application exists I had to import it directly from the Config class. The one problem that this creates is that it is not possible to have different brokers in different configurations, this item is fixed for all configurations.

The configuration portion is very easy. In the application factory function the application is available, so configuration works exactly as in the single file case.

Sending Asynchronous Emails Through Celery

To test this setup I converted the thread based email sending function to use Celery. This was surprisingly easy to do. Here is the relevant code:

from . import celery

@celery.task
def send_async_email(msg):
    mail.send(msg)

def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    send_async_email.delay(msg)

Here I simply decorate the function that sends the email with celery.task, and then invoke it using the delay() method. In the thread based version the main thread passed the app variable to the background thread so that it can set up an application context (required by Flask-Mail), but I have removed that here because passing an application instance to the Celery worker process doesn't make much sense. Instead I want the worker to have its own Flask application, like I did in the single file example.

Setting Up The Celery Worker

The only remaining task is to launch a Celery worker. This process needs to have its own Flask application instance that can be used to create the context necessary for the Flask background tasks to run. For this I used a separate starter script, which I called celery_worker.py:

#!/usr/bin/env python
import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()

This little script creates a Flask application and pushes an application context, which will remain set through the entire life of the process. Celery also needs access to the celery instance, so I imported it from the app package.

If you have an activated virtual environment, now you can start the Celery worker with the following command:

(venv) $ celery -A celery_worker.celery worker --loglevel=info

If you now start a Redis service and the Flasky application, everything should be working.

Conclusion

I hope this clarifies the setup of Celery, but if there are any remaining questions feel free to let me know below in the comments. If you want step-by-step instructions on how to run the example code, see the README file on the github repository.

Miguel

66 comments

  • #26 Miguel Grinberg said 2016-02-18T06:36:17Z

    @Nikolay: I believe Celery needs to know the broker from the start, yes. Not sure I understand where the circular import is. Can you show me an example?

  • #27 Jatayu said 2016-03-02T13:23:31Z

    Thanks Miguel.

    How many Flask app instances running in this scenario ?

    Is there a large Flask server running as the main application serving webpages/forms etc, and a separate Flask app running the Celery?

    Or are they one and the same Flask instance ?

    Is it even possible to have 2 Flask instance like I mentioned, and control the Celery from the other Flask instance ?

    What is the drawback of running Celery in the same Flask instance that serve the main we forms etc ?

    Thank you for Any hints/answer.

  • #28 Jeremy said 2016-03-02T22:00:50Z

    I just want to say THANK YOU SOOOO MUCH!!!!. I've just restructured an application to use application factories, blueprints and celery. The factories and blueprint took a bit of time to fully grasp and a LOT of thinking about how to properly do this without circular imports. After I got the app fully configured, I thought I was done until I tried to run the app and no celery tasks were running. I've spent hours of frustration on this. Scrapped the application factory idea completely. I even had come across this post and didn't take the time to fully read it like I should(Frustration does weird things to brain) I then fully re-read this blog post and placed the celery object in my app/init.py and configured it per your recommendations within the create_app function. I then just imported the celery object to my tasks module and created the celery_worker.py module per your requirements. I still don't fully understand why the need to import the celery object into the celery_worker since it's not being used and I will continue to do research on this, but GEE GOLLY WILKERS BATMAN! IT WORKSSS!!! FINALLYYYY!!! THANK U THANK U THANK UUUUUUUU!!!! None of my ideas were working. Either creating circular imports or just flat out crashing. Not sure how you came up with this design but you're a GOD SEND!!!

  • #29 Miguel Grinberg said 2016-03-05T03:08:17Z

    @Jatayu: The main server has one application instance. The celery worker(s) have each one application instance. You can't run Celery on the same app instance as the main application, since Celery worker(s) run on separate processes.

  • #30 zou said 2016-08-29T08:40:42Z

    hi Miguel Grinberg when i use: @celery.task(bind=True) def mysqlTomongoDB(self, id): job = JobInfo.query.filter_by(id=id).first() config = job.to_josn() I encountered this problem:RuntimeError: application not registered on db instance and no application bound to current context i use celery and flask usage like you

  • #31 E Rodriguez said 2016-09-07T11:44:54Z

    Hi Miguel,

    How could I perform database operations in my celery tasks? I am struggling with the db session and app context :/

  • #32 Miguel Grinberg said 2016-09-10T17:42:17Z

    @zou: any time you use the database, you need to do so while an application context is active. This is how the database instance knows where to go look for the database settings you have in the configuration.

  • #33 Miguel Grinberg said 2016-09-10T17:52:18Z

    @E: you need to use an application instance to push an application context:

    with app.app_context(): # you can use the db here

  • #34 TDog said 2016-11-07T22:36:38Z

    Hi Miguel,

    Thank you! The above steps seem to work fine, but Celery 3.1.24 gave me a warning about avoiding the pickle serializer for security reasons. As suggested I've configured it like this:

    CELERY_ACCEPT_CONTENT = ['json']

    But now when I attempt to run a task I get this:

    CRITICAL/MainProcess] Can't decode message body: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',) [type:'application/x-python-serialize' encoding:'binary' headers:{}]

    I discovered I might have to configure it more like so:

    CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'

    But I still get the same error as above? Do you have any advice?

  • #35 Miguel Grinberg said 2016-11-08T04:45:21Z

    @TDog: the error seems to suggest the Flask server is sending pickled tasks, which the Celery server rejects. Have you updated both sides with the configuration change to use json?

  • #36 TDog said 2016-11-11T06:42:15Z

    @Miguel: thanks, I was accidentally importing a random celery instance from another module, I suspect this was why.

  • #37 Andrey said 2017-01-29T12:35:34Z

    Hello, Miguel. Thank you for this article. Could you please say a couple of words (maybe another wonderful article..?) about the testing process?

  • #38 Miguel Grinberg said 2017-01-30T17:59:36Z

    @Andrey: What testing process? Do you mean in general? You may want to look at my Flack project on GitHub. This is a project with Celery integration and has good unit test coverage.

  • #39 Andrey said 2017-01-31T10:06:13Z

    I'll study your code on github.com better.

    Now, I'll try to explain what I do not understand yet. For example, we have this function "send_email" and use it in a view function. How can I test this view function? Do I need to start the Celery worker first? Is there any peculiarity how to start it, where to start it and with what app_context? Will the message be sent inside the test? Or how to prevent sending message while testing?

    And I have one more question about the "send_async_email" function in your example... It gets the instance of flask_mail.Message as its argument. I can't understand how it works and has no conflict with the message serialization. I just try to use Celery the first time and write my own code with no experience with Celery. That is why I have so many question... I am sorry.

    And thank you in advance for your answers. They are very important.

  • #40 Miguel Grinberg said 2017-01-31T19:03:56Z

    @Andrey: i think you can go about it in different ways. My preference for testing code that launches Celery tasks is to test both sides independently. On the server side you can mock the Celery task. This eliminates the need to have Celery running during unit tests, which complicates things a lot.

    Regarding serialization, Celery provides several methods to serialize arguments. Back when I did this, pickle was the default. This method can serialize complex objects, so that worked well for me. These days, Celery changed the default, so pickle needs to be explicitly selected if you want to use it. Or as a better alternative, you can send the data as a dictionary and create the Message object directly on the Celery task.

  • #41 Andrey said 2017-02-01T13:56:43Z

    On the server side you can mock the Celery task.

    This magic is unreachable for me now. But I'll do my best. :)

    OK. Thank you very much, Miguel.

  • #42 Greg said 2017-02-03T21:30:17Z

    Hey Miguel!

    This is a great tutorial. I am using your application factory pattern and my Flask application runs as a wsgi application. I seem to be running into the following issue when I attempt to run Celery within the application context:

    [2017-02-03 16:24:27,915: CRITICAL/MainProcess] Unrecoverable error: ImportError('No module named http',) Traceback (most recent call last): File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/worker/worker.py", line 203, in start self.blueprint.start(self) File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/bootsteps.py", line 115, in start self.on_start() File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/apps/worker.py", line 143, in on_start self.emit_banner() File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/apps/worker.py", line 158, in emit_banner ' \n', self.startup_info(artlines=not use_image))), File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/apps/worker.py", line 221, in startup_info results=self.app.backend.as_uri(), File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/kombu/utils/objects.py", line 44, in get value = obj.dict[self.name] = self.get(obj) File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/app/base.py", line 1182, in backend return self._get_backend() File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/app/base.py", line 900, in _get_backend self.loader) File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/app/backends.py", line 65, in by_url return by_name(backend, loader), url File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/celery/app/backends.py", line 45, in by_name cls = symbol_by_name(backend, aliases) File "/apps/env27/virtualenv/fl_program/lib/python2.7/site-packages/kombu/utils/imports.py", line 56, in symbol_by_name module = imp(module_name, package=package, **kwargs) File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/init.py", line 37, in import_module import__(name) ImportError: No module named http

  • #43 Miguel Grinberg said 2017-02-04T07:58:07Z

    @Greg: looks like a Python 2 vs Python 3 issue. I recommend that you either use a newer version of Python (3.5 or 3.6), or else downgrade Celery to an older release. You can look at the requirements file in the GitHub project to see what version I used back when i wrote this article.

  • #44 Andrey said 2017-02-12T14:26:15Z

    Hello, Miguel. I am back, and I definitely need your help. Could you please help me?

    My problem is this: https://github.com/tgi-f/journal

    This is my current application at the start point. And actually it works well on my local machine with the development server. But I tried to deploy this application on VirtualBox with Ubuntu-16.04-server. And there this application shows CSRF Error.

    Please, help me understand what I did wrong, how to debug the error and fix it. More information about the deployment and the error I got is available here:

    http://avm.pythonanywhere.com/RJMjaB

    I am determined to answer any of your question and explain every letter in my code. Thank you in advance...

  • #45 Miguel Grinberg said 2017-02-12T16:47:47Z

    @Audrey: can't really tell, but you can debug this on your side. Just look at the requests that the browser is sending to verify if the correct CSRF token is being sent. I think that should allow you to find what is the problem.

  • #46 Andrey said 2017-02-13T03:11:18Z

    Just look at the requests that the browser is sending to verify if the correct CSRF token is being sent.

    Actually it is being sent. There is the CSRF token in the request form and cookies in the request headers. But why they do not match? Some strange and devastating thing happens with cookies. I have no idea what to do. Is the configuration of my app right and correct?

  • #47 Miguel Grinberg said 2017-02-13T06:07:06Z

    @Audrey: I don't see anything wrong in your configuration, at least nothing obvious. If you can see a CSRF token submitted with the form, then will need to decode the session cookie, and see what other token the server was expecting. This isn't really that difficult, the server comes up with a token, puts it in the form that goes out to the client, and also in the session cookie, then when the client submits it back, the session and the form tokens must match. That's all there is to it, so you need to find where is the disconnect.

  • #48 Andrey said 2017-02-13T09:30:05Z

    OK, thank you, Miguel... I suspect I have to read the manual on nginx and test gunicorn on my local machine in place of the development server. The strange thing is, the error I got is not stable. I'll think about your words and see what I can do. If you find something bad in my code, please, let me know.

  • #49 Andrey said 2017-02-17T10:19:18Z

    It's me again. I tried gunicorn on my local machine in place of the development server. It does not work. At least I know who is responsible now.

  • #50 Andrey said 2017-03-11T14:32:05Z

    Hello, Miguel. It is me again, and my issue got solved. It turned out Gunicorn was not guilty. I had to change the configuration of my application and the way how Celery was initialized. Now everything works fine and there is no CSRF errors on my local machine and virtual server where I test the solution.

Leave a Comment