2018-05-01T23:17:57Z

The Flask Mega-Tutorial Part XXII: Background Jobs

This is the twenty second installment of the Flask Mega-Tutorial series, in which I'm going to tell you how to create background jobs that run independently of the web server.

For your reference, below is a list of the articles in this series.

Note 1: If you are looking for the legacy version of this tutorial, it's here.

Note 2: If you would like to support my work on this blog, or just don't have patience to wait for weekly articles, I am offering the complete version of this tutorial packaged as an ebook or a set of videos. For more information, visit courses.miguelgrinberg.com.

This chapter is dedicated to the implementation of long or complex processes that need to run as part of the application. These processes cannot be executed synchronously in the context of a request because that would block the response to the client for the duration of the task. I briefly touched on this topic in Chapter 10, when I moved the sending of emails to background threads to prevent the client from having to wait during those 3-4 seconds that it takes to send an email. While using threads for emails is acceptable, this solution does not scale well when the processes in question are much longer. The accepted practice is to offload long tasks to a worker process, or more likely to a pool of them.

To justify the need for having long running tasks, I'm going to introduce an export feature to Microblog through which users will be able to request a data file with all their blog posts. When a user makes use of this option, the application is going to start an export task that will generate a JSON file with all the user's posts, and then send it to the user by email. All this activity is going to happen in a worker process, and while it happens the user will see a notification showing the percentage of completion.

The GitHub links for this chapter are: Browse, Zip, Diff.

Introduction to Task Queues

Task queues provide a convenient solution for the application to request the execution of a task by a worker process. Worker processes run independently of the application and can even be located on a different system. The communication between the application and the workers is done through a message queue. The application submits a job, and then monitors its progress by interacting with the queue. The following diagram shows a typical implementation:

Task Queue Diagram

The most popular task queue for Python is Celery. This is a fairly sophisticated package that has many options and supports several message queues. Another popular Python task queue is Redis Queue or just RQ, which sacrifices some flexibility, such as only supporting a Redis message queue, but in exchange it is much simpler to set up than Celery.

Both Celery and RQ are perfectly adequate to support background tasks in a Flask application, so my choice for this application is going to favor the simplicity of RQ. However, implementing the same functionality with Celery should be relatively easy. If you are interested in Celery more than RQ, you can read the Using Celery with Flask article that I have on my blog.

Using RQ

RQ is a standard Python package, that is installed with pip:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

As I mentioned earlier, the communication between the application and the RQ workers is going to be carried out in a Redis message queue, so you need to have a Redis server running. There are many options to get a Redis server installed and running, from one-click installers to downloading the source code and compiling it directly on your system. If you are using Windows, Microsoft maintains installers here. On Linux, you can likely get it as a package through your operating system's package manager. Mac OS X users can run brew install redis and then start the service manually with the redis-server command.

You will not need to interact with Redis at all outside of just ensuring that the service is running and accessible to RQ.

Note that RQ does not run on the Windows native Python interpreter. If you are using the Windows platform, you can only run RQ under Unix emulation. The two Unix emulation layers that I recommend to Windows users are Cygwin and the Windows Subsystem for Linux (WSL), and both are compatible with RQ.

Creating a Task

I'm going to show you how to run a simple task through RQ so that you familiarize with it. A task, is nothing more than a Python function. Here is an example task, that I'm going to put in a new app/tasks.py module:

app/tasks.py: Example background task.

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

This task takes a number of seconds as an argument, and then waits that amount of time, printing a counter once a second.

Running the RQ Worker

Now that the task is ready, a worker can be starter. This is done with the rq worker command:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

The worker process is now connected to Redis, and watching for any jobs that may be assigned to it on a queue named microblog-tasks. In cases where you want to have multiple workers to have more throughput, all you need to do is run more instances of rq worker, all connected to the same queue. Then when a job shows up in the queue, any of the available worker processes will pick it up. In a production environment you will probably want to have at least as many workers as available CPUs.

Executing Tasks

Now open a second terminal window and activate the virtual environment on it. I'm going to use a shell session to kick off the example() task in the worker:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

The Queue class from RQ represents the task queue as seen from the application side. The arguments it takes are the queue name, and a Redis connection object, which in this case I initialize with a default URL. If you have your Redis server running on a different host or port number, you will need to use a different URL.

The enqueue() method on the queue is used to add a job to the queue. The first argument is the name of the task you want to execute, given directly as a function object, or as an import string. I find the string option much more convenient, as that makes it unnecessary to import the function on the application's side. Any remaining arguments given to enqueue() are going to be passed to the function running in the worker.

As soon as you make the enqueue() call you are going to notice some activity on your first terminal window, the one running the RQ worker. You will see that the example() function is now running, and printing the counter once per second. At the same time, your other terminal is not blocked and you can continue evaluating expressions in the shell. In the example above, I called the job.get_id() method to obtain the unique identifier assigned to the task. Another interesting expression you can try with the job object is to check if the function on the worker has finished:

>>> job.is_finished
False

If you passed a 23 like I did in my example above, then the function is going to run for about 23 seconds. After that time, the job.is_finished expression will become True. Isn't this pretty cool? I really like the simplicity of RQ.

Once the function completes, the worker goes back to waiting for new jobs, so you can repeat the enqueue() call with different arguments if you want to experiment more. The data that is stored in the queue regarding a task will stay there for some time (500 seconds by default), but eventually will be removed. This is important, the task queue does not preserve a history of executed jobs.

Reporting Task Progress

The example task I have used above is unrealistically simple. Normally, for a long running task you will want some sort of progress information to be made available to the application, which in turn can show it to the user. RQ supports this by using the meta attribute of the job object. Let me rewrite the example() task to write progress reports:

app/tasks.py: Example background task with progress.

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

This new version of example() uses RQ's get_current_job() function to get a job instance, which is similar to the one returned to the application when it submits the task. The meta attribute of the job object is a dictionary where the task can write any custom data that it wants to communicate to the application. In this example, I'm writing a progress item that represents the percentage of completion of the task. Each time the progress is updated I call job.save_meta() to instruct RQ to write the data to Redis, where the application can find it.

On the application side (currently just a Python shell), I can run this task and then monitor progress as follows:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

As you can see above, on this side the meta attribute is available to read. The refresh() method needs to be invoked for the contents to be updated from Redis.

Database Representation of Tasks

For the example above it was enough to start a task and watch it run. For a web application things get a bit more complicated, because once one of these task is started as part of a request, that request is going to end, and all the context for that task is going to be lost. Because I want the application to keep track of what tasks each user is running, I need to use a database table to maintain some state. Below you can see the new Task model implementation:

app/models.py: Task model.

# ...
import redis
import rq

class User(UserMixin, db.Model):
    # ...
    tasks = db.relationship('Task', backref='user', lazy='dynamic')

# ...

class Task(db.Model):
    id = db.Column(db.String(36), primary_key=True)
    name = db.Column(db.String(128), index=True)
    description = db.Column(db.String(128))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    complete = db.Column(db.Boolean, default=False)

    def get_rq_job(self):
        try:
            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
            return None
        return rq_job

    def get_progress(self):
        job = self.get_rq_job()
        return job.meta.get('progress', 0) if job is not None else 100

An interesting difference between this model and the previous ones is that the id primary key field is a string, not an integer. This is because for this model, I'm not going to rely on the database's own primary key generation and instead I'm going to use the job identifiers generated by RQ.

The model is going to store the task's fully qualified name (as passed to RQ), a description for the task that is appropriate for showing to users, a relationship to the user that requested the task, and a boolean that indicates if the task completed or not. The purpose of the complete field is to separate tasks that ended from those that are actively running, as running tasks require special handling to show progress updates.

The get_rq_job() method is a helper method that loads the RQ Job instance, from a given task id, which I can get from the model. This is done with Job.fetch(), which loads the Job instance from the data that exists in Redis about it. The get_progress() method builds on top of get_rq_job() and returns the progress percentage for the task. This method has a couple of interesting assumptions. If the job id from the model does not exist in the RQ queue, that means that the job already finished and the data expired and was removed from the queue, so in that case the percentage returned is 100. On the other extreme, if the job exists, but there is no information associated with the meta attribute, then it is safe to assume that the job is scheduled to run, but did not get a chance to start yet, so in that situation a 0 is returned as progress.

To apply the changes to the database schema, a new migration needs to be generated, and then the database upgraded:

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

The new model can also be added to the shell context, to make it accessible in shell sessions without having to import it:

microblog.py: Add Task model to shell context.

from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task

app = create_app()
cli.register(app)

@app.shell_context_processor
def make_shell_context():
    return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
            'Notification': Notification, 'Task': Task}

Integrating RQ with the Flask Application

The connection URL for the Redis service needs to be added to the configuration:

class Config(object):
    # ...
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

As always, the Redis connection URL will be sourced from an environment variable, and if the variable isn't defined, a default URL that assumes the service is running on the same host and in the default port will be used.

The application factory function will be in charge of initializing Redis and RQ:

app/__init__.py: RQ integration.

# ...
from redis import Redis
import rq

# ...

def create_app(config_class=Config):
    # ...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

    # ...

The app.task_queue is going to be the queue where tasks are submitted. Having the queue attached to the application is convenient because anywhere in the application I can use current_app.task_queue to access it. To make it easy for any part of the application to submit or check on a task, I can create a few helper methods in the User model:

app/models.py: Task helper methods in the user model.

# ...

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        return Task.query.filter_by(user=self, complete=False).all()

    def get_task_in_progress(self, name):
        return Task.query.filter_by(name=name, user=self,
                                    complete=False).first()

The launch_task() method takes care of submitting a task to the RQ queue, along with adding it to the database. The name argument is the function name, as defined in app/tasks.py. When submitting to RQ, the function prepends app.tasks. to this name to build the fully qualified function name. The description argument is a friendly description of the task that can be presented to users. For the function that export the blog posts, I will set the name to export_posts and the description to Exporting posts.... The remaining arguments are positional and keyword arguments that will be passed to the task. The function begins by calling the queue's enqueue() method to submit the job. The job object that is returned contains the task id assigned by RQ, so I can use that to create a corresponding Task object in my database.

Note that launch_task() adds the new task object to the session, but it does not issue a commit. In general, it is best to operate on the database session in the higher level functions, as that allows you to combine several updates made by lower level functions in a single transaction. This is not a strict rule, and in fact, you are going to see an exception where a commit is issued in a child function later in this chapter.

The get_tasks_in_progress() method returns the complete list of functions that are outstanding for the user. You will see later that I use this method to include information about running tasks in the pages that are rendered to the user.

Finally, the get_task_in_progress() is a simpler version of the previous one that returns a specific task. I prevent users from starting two or more tasks of the same type concurrently, so before I launch a task, I can use this method to find out if a previous task is currently running.

Sending Emails from the RQ Task

This may seem like a distraction from the main topic, but I said above that when the background export task completes, an email is going to be sent to the user with a JSON file that contains all the posts. The email functionality that I built in Chapter 11 needs to be extended in two ways. First, I need to add support for file attachments, so that I can attach a JSON file. Second, the send_email() function always sends emails asynchronously, using a background thread. When I'm going to send an email from a background task, which is already asynchronous, having a second level background task based on a thread makes little sense, so I need to support both synchronous and asynchronous email sending.

Luckily, Flask-Mail supports attachments, so all I need to do is extend the send_email() function to take them in an additional argument, and then configure them in the Message object. And to optionally send the email in the foreground, I just need to add a boolean sync argument:

app/email.py: Send emails with attachments.

# ...

def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
            args=(current_app._get_current_object(), msg)).start()

The attach() method of the Message class accepts three arguments that define an attachment: the filename, the media type, and the actual file data. The filename is just the name that the recipient will see associated with the attachment, it does not need to be a real file. The media type defines what type of attachment is this, which helps email readers render it appropriately. For example, if you send image/png as the media type, an email reader will know that the attachment is an image, in which case it can show it as such. For the blog post data file I'm going to use the JSON format, which uses a application/json media type. The third and last argument is a string or byte sequence with the contents of the attachment.

To make it simple, the attachments argument to send_email() is going to be a list of tuples, and each tuple is going to have three elements which correspond to the three arguments of attach(). So for each element in this list, I need to send the tuple as arguments to attach(). In Python, if you have a list or tuple with arguments that you want to send to a function, you can use func(*args) to have that list expanded into the actual argument list, instead of having to use a more tedious syntax such as func(args[0], args[1], args[2]). So for example, if you have args = [1, 'foo'], the call will send two arguments, same as if you called func(1, 'foo'). Without the *, the call would have a single argument which would be the list.

As far as the synchronous sending of the email, what I needed to do is just revert back to calling mail.send(msg) directly when sync is True.

Task Helpers

While the example() task I used above was a simple standalone function, the function that exports blog posts is going to need some of the functionality I have in the application, like access to the database and the email sending function. Because this is going to run in a separate process, I need to initialize Flask-SQLAlchemy and Flask-Mail, which in turn need a Flask application instance from which to get their configuration. So I'm going to add a Flask application instance and application context at the top of the app/tasks.py module:

app/tasks.py: Create application and context.

from app import create_app

app = create_app()
app.app_context().push()

The application is created in this module because this is the only module that the RQ worker is going to import. When you use the flask command, the microblog.py module in the root directory creates the application, but the RQ worker knows nothing about that, so it needs to create its own application instance if the task functions need it. You have seen the app.app_context() method in a couple of places already, pushing a context makes the application be the "current" application instance, and this enables extensions such as Flask-SQLAlchemy to use current_app.config to obtain their configuration. Without the context, the current_app expression would return an error.

I then started thinking about how I was going to report progress while this function is running. In addition to passing progress information through the job.meta dictionary, I'd like to push notifications to the client, so that the completion percentage can be updated dynamically without the user having to refresh the page. For this I'm going to use the notification mechanisms I built in Chapter 21. The updates are going to work in a very similar way to the unread messages badge. When the server renders a template, it will include "static" progress information obtained from job.meta, but then once the page is on the client's browser, the notifications are going to dynamically update the percentage using notifications. Because of the notifications, updating the progress of a running task is going to be slightly more involved than how I did it in the previous example, so I'm going to create a wrapper function dedicated to updating progress:

app/tasks.py: Set task progress.

from rq import get_current_job
from app import db
from app.models import Task

# ...

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = Task.query.get(job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

The export task can call _set_task_progress() to record the progress percentage. The function first writes the percentage to the job.meta dictionary and saves it to Redis, then it loads the corresponding task object from the database and uses task.user to push a notification to the user that requested the task, using the existing add_notification() method. The notification is going to be named task_progress, and the data associated with it is going to be a dictionary with two items, the task identifier and the progress number. Later I will add JavaScript code to act on this new notification type.

The function checks if the progress indicates that the function has completed, and in that case also updates the complete attribute of the task object in the database. The database commit call ensures that the task and the notification object added by add_notification() are both saved immediately to the database. I needed to be very careful in how I designed the parent task to not make any database changes, since this commit call would write those as well.

Implementing the Export Task

Now all the pieces are in place for me to write the export function. The high level structure of this function is going to be as follows:

app/tasks.py: Export posts general structure.

def export_posts(user_id):
    try:
        # read user posts from database
        # send email with data to user
    except:
        # handle unexpected errors

Why wrap the whole task in a try/except block? The application code that exists in request handlers is protected against unexpected errors because Flask itself catches exceptions and then handles them observing any error handlers and logging configuration I have set up for the application. This function, however, is going to run in a separate process that is controlled by RQ, not Flask, so if any unexpected errors occur the task will abort, RQ will display the error to the console and then will go back to wait for new jobs. So basically, unless you are watching the output of the RQ worker or logging it to a file, you will never find out there was an error.

Let's start looking at the three sections indicated with comments above with the simplest one, which is the error handling at the end:

app/tasks.py: Export posts error handling.

import sys
# ...

def export_posts(user_id):
    try:
        # ...
    except:
        _set_task_progress(100)
        app.logger.error('Unhandled exception', exc_info=sys.exc_info())

Whenever an unexpected error occurs, I'm going to mark the task as finished by setting the progress to 100%, and then use the logger object from the Flask application to log the error, along with the stack trace, information which is provided by the sys.exc_info() call. The nice thing about using the Flask application logger to log errors here as well is that any logging mechanisms you have implemented for the Flask application will be observed. For example, in Chapter 7 I configured errors to be sent out to the administrator email address. Just by using app.logger I also get that behavior for these errors.

Next, I'm going to code the actual export, which simply issues a database query and walks through the results in a loop, accumulating them in a dictionary:

app/tasks.py: Read user posts from the database.

import time
from app.models import User, Post

# ...

def export_posts(user_id):
    try:
        user = User.query.get(user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = user.posts.count()
        for post in user.posts.order_by(Post.timestamp.asc()):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

        # send email with data to user
    except:
        # ...

For each post, the function is going to include a dictionary with two elements, the post body and the time the post was written. The time is going to be written in the ISO 8601 standard. The Python's datetime objects that I'm using do not store a timezone, so after I export the time in the ISO format I add the 'Z', which indicates UTC.

The code gets slightly complicated due to the need to keep track of progress. I maintain the counter i, and I need to issue an extra database query before I enter the loop for total_posts to have the number of posts. Using i and total_posts, each loop iteration can update the task progress with a number from 0 to 100.

You may have noticed that I also added a time.sleep(5) call in each loop iteration. The main reason I added the sleep is to make the export task last longer, and be able to see the progress go up even when the export covers just a handful of blog posts.

Below you can see the last part of the function, which sends an email to the user with all the information collected in data as an attachment:

app/tasks.py: Email posts to user.

import json
from flask import render_template
from app.email import send_email

# ...

def export_posts(user_id):
    try:
        # ...

        send_email('[Microblog] Your blog posts',
                sender=app.config['ADMINS'][0], recipients=[user.email],
                text_body=render_template('email/export_posts.txt', user=user),
                html_body=render_template('email/export_posts.html', user=user),
                attachments=[('posts.json', 'application/json',
                              json.dumps({'posts': data}, indent=4))],
                sync=True)
    except:
        # ...

This is simply a call to the send_email() function. The attachment is defined as a tuple with the three elements that are then passed to the attach() method of Flask-Mail's Message object. The third element in the tuple is the attachment contents, which are generated with Python's json.dumps() function.

There are a pair of new templates referenced here, which provide the contents of the email body in plain text and HTML form. Here is the text template:

app/templates/email/export_posts.txt: Export posts text email template.

Dear {{ user.username }},

Please find attached the archive of your posts that you requested.

Sincerely,

The Microblog Team

Here is the HTML version of the email:

app/templates/email/export_posts.html: Export posts HTML email template.

<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>

Export Functionality in the Application

All the core pieces to support the background export tasks are now in place. What remains is to hook up this functionality to the application, so that users can place requests for their posts to be emailed to them.

Below you can see a new export_posts view function:

app/main/routes.py: Export posts route and view function.

@bp.route('/export_posts')
@login_required
def export_posts():
    if current_user.get_task_in_progress('export_posts'):
        flash(_('An export task is currently in progress'))
    else:
        current_user.launch_task('export_posts', _('Exporting posts...'))
        db.session.commit()
    return redirect(url_for('main.user', username=current_user.username))

The function first checks if the user has an outstanding export task, and in that case just flashes a message. It really makes no sense to have two export tasks for the same user at the same time, so this is prevented. I can check for this condition using the get_task_in_progress() method I implemented earlier.

If the user isn't already running an export, then launch_task() is invoked to start a one. The first argument is the name of the function that will be passed to the RQ worker, prefixed with app.tasks.. The second argument is just a friendly text description that will be shown to the user. Both values are written to the Task object in the database. The function ends with a redirect to the user profile page.

Now I need to expose a link to this route that the user can access to request the export. I think the most appropriate place to do this is in the user profile page, where the link can only be shown when users view their own page, right below the "Edit your profile" link:

app/templates/user.html: Export link in user profile page.

                ...
                <p>
                    <a href="{{ url_for('main.edit_profile') }}">
                        {{ _('Edit your profile') }}
                    </a>
                </p>
                {% if not current_user.get_task_in_progress('export_posts') %}
                <p>
                    <a href="{{ url_for('main.export_posts') }}">
                        {{ _('Export your posts') }}
                    </a>
                </p>
                ...
                {% endif %}

This link is tied to a conditional, because I don't want it to appear when the user already has an export in progress.

At this point the background jobs should be functional, but without giving any feedback to the user. If you want to try this out, you can start the application and the RQ worker as follows:

  • Make sure you have Redis running
  • In a first terminal window, start one or more instances of the RQ worker. For this you have to use the command rq worker microblog-tasks
  • In a second terminal window, start the Flask application with flask run (remember to set FLASK_APP first)

Progress Notifications

To wrap up this feature I want to inform the user when a background task is running, including a percentage of completion. In looking through the Bootstrap component options, I decided to use an alert below the navigation bar for this. Alerts are these color horizontal bars that display information to the user. The blue alert boxes are what I'm using to render flashed messages. Now I'm going to add a green one to show progress status. Below you can see how that is going to look:

Progress Alert

app/templates/base.html: Export progress alert in base template.

...
{% block content %}
    <div class="container">
        {% if current_user.is_authenticated %}
        {% with tasks = current_user.get_tasks_in_progress() %}
        {% if tasks %}
            {% for task in tasks %}
            <div class="alert alert-success" role="alert">
                {{ task.description }}
                <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
            </div>
            {% endfor %}
        {% endif %}
        {% endwith %}
        {% endif %}
        ...
{% endblock %}
...

The method to render the task alerts is almost identical to the flashed messages. The outer conditional skips all the alert related markup when the user is not logged in. For logged in users, I get the currently in-progress task list by calling the get_tasks_in_progress() method I created earlier. In the current version of the application I will only get one result at the most, since I don't allow more than one active export at a time, but in the future I may want to support other types of tasks that can coexist, so writing this in a generic way could save me time later.

For each task I write an alert element to the page. The color of the alert is controlled with the second CSS style, which in this case is alert-success, while in the case of the flashed messages was alert-info. The Bootstrap documentation includes the details on the HTML structure for the alerts. The text of the alert includes the description field stored in the Task model, followed by the completion percentage.

The percentage is wrapped in a <span> element that has a id attribute. The reason for this is that I'm going to refresh the percentage from JavaScript when notifications are received. The id that I'm using for a given task is constructed as the task id with -progress appended at the end. When a notification arrives, it will contain the task id, so I can easily locate the correct <span> element to update with a selector for #<task.id>-progress.

If you try the application at this point, you are going to see "static" progress updates, each time you navigate to a new page. You will notice that after you start an export task, you can freely navigate to different pages of the application, and the state of the running task is always recalled.

To prepare for applying dynamic updates to the percentage <span> elements, I'm going to write a little helper function in the JavaScript side:

app/templates/base.html: Helper function to dynamically update task progress.

...
{% block scripts %}
    ...
    <script>
        ...
        function set_task_progress(task_id, progress) {
            $('#' + task_id + '-progress').text(progress);
        }
    </script>
    ...
{% endblock %}

This function takes a task id and a progress value, and uses jQuery to locate the <span> element for this task and write the new progress as its contents. There is really no need to verify if the element exists on the page, because jQuery will do nothing if no elements are located with the given selector.

The notifications are already arriving to the browser because the _set_task_progress() function in app/tasks.py calls add_notification() each time the progress is updated. If you are confused about how these notifications could be reaching the browser without me having to do anything, it's really because in Chapter 21 I was wise to implement the notifications feature in a completely generic way. Any notifications that are added through the add_notification() method will be seen by the browser when it periodically asks the server for notification updates.

But the JavaScript code that processes these notifications only recognizes those that have a unread_message_count name, and ignores the rest. What I need to do now is expand that function to also handle task_progress notifications by calling the set_task_progress() function I defined above. Here is an updated version of the loop that processes notifications from JavaScript:

app/templates/base.html: Notification handler.

                        for (var i = 0; i < notifications.length; i++) {
                            switch (notifications[i].name) {
                                case 'unread_message_count':
                                    set_message_count(notifications[i].data);
                                    break;
                                case 'task_progress':
                                    set_task_progress(
                                        notifications[i].data.task_id,
                                        notifications[i].data.progress);
                                    break;
                            }
                            since = notifications[i].timestamp;
                        }

Now that I need to handle two different notifications, I decided to replace the if statement that checked for the unread_message_count notification name with a switch statement that contains one section for each of the notifications I now need to support. If you are not very familiar with the "C" family of languages you may not have seen switch statements before. These provide a convenient syntax that replaces a long chain of if/elseif statements. This is nice because as I need to support more notifications, I can simply keep adding them as additional case blocks.

If you recall, the data that the RQ task attaches to the task_progress notification is a dictionary with two elements, task_id and progress, which are the two arguments that I need to use to invoke set_task_progress().

If you run the application now, the progress indicator in the green alert box is going to refresh every 10 seconds, as notifications are delivered to the client.

Because I have introduced new translatable strings in this chapter, the translation files need to be updated. If you are maintaining a non-English language file, you need to use Flask-Babel to refresh your translation files and then add new translations:

(venv) $ flask translate update

If you are using the Spanish translation, then I have done the translation work for you, so you can just extract the app/translations/es/LC_MESSAGES/messages.po files from the download package for this chapter and add it to your project.

After the translations are done, you have to compile the translation files:

(venv) $ flask translate compile

Deployment Considerations

To complete this chapter, I want to discuss how the deployment of the application changes. To support background tasks I have added two new components to the stack, a Redis server, and one or more RQ workers. Obviously these need to be included in your deployment strategy, so I'm going to briefly go over the different deployment options I covered in previous chapters and how they are affected by these changes.

Deployment on a Linux Server

If you are running your application on a Linux server, adding Redis should be as simple as installing this package from your operating system. For Ubuntu Linux, you have to run sudo apt-get install redis-server.

To run the RQ worker process, you can follow the "Setting Up Gunicorn and Supervisor" section in Chapter 17 to create a second Supervisor configuration in which you run rq worker microblog-tasks instead of gunicorn. If you want to run more than one worker (and you probably should for production), you can use Supervisor's numprocs directive to indicate how many instances you want to have running concurrently.

Deployment on Heroku

To deploy the application on Heroku you are going to need to add a Redis service to your account. This is similar to the process that I used to add the Postgres database. Redis also has a free tier, which can be added with the following command:

$ heroku addons:create heroku-redis:hobby-dev

The access URL for your new redis service is going to be added to your Heroku environment as a REDIS_URL variable, which is exactly what the application expects.

The free plan in Heroku allows one web dyno and one worker dyno, so you can host a single rq worker along with your application without incurring into any expenses. For this you will need to declare the worker in a separate line in your procfile:

web: flask db upgrade; flask translate compile; gunicorn microblog:app
worker: rq worker -u $REDIS_URL microblog-tasks

After you deploy with these changes, you can start the worker with the following command:

$ heroku ps:scale worker=1

Deployment on Docker

If you are deploying the application to Docker containers, then you first need to create a Redis container. For this you can use one of the official Redis images from the Docker registry:

$ docker run --name redis -d -p 6379:6379 redis:3-alpine

When you run your application you will need to link the redis container and set the REDIS_URL environment variable, similar to how the MySQL container is linked. Here is a complete command to start the application including a redis link:

$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    microblog:latest

Finally, you will need to run one or more containers for the RQ workers. Because the workers are based on the same code as the main application, you can use the same container image you use for your application, overriding the start up command so that the worker is started instead of the web application. Here is an example docker run command that starts a worker:

$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    --entrypoint venv/bin/rq \
    microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks

Overriding the default start up command of a Docker image is a bit tricky because the command needs to be given in two parts. The --entrypoint argument takes just the executable name, but the arguments (if any) need to be given after the image and tag, at the end of the command line. Note that rq needs to be given as venv/bin/rq so that it works without having the virtual environment activated.

52 comments

  • #1 Italo Maia said 2018-05-02T02:53:43Z

    Hey Miguel, great chapter. Do you know of Flask-RQ2? Do you prefer Flask-RQ?

  • #2 Miguel Grinberg said 2018-05-02T05:13:15Z

    @Italo: As you can see in this article, I don't use an extension, I prefer to work with RQ directly.

  • #3 Brian said 2018-05-02T22:22:10Z

    Hi Miguel. Thanks for this tutorial. Running on Windows, I get the error "module 'os' has no attribute 'fork'" when I try to run "job = queue.enqueue('app.tasks.example', 23)". Searching around, it looks like this is a limitation of running Python on Windows. Do you know if this is the case? And if so, is there any workaround, other than finding a Linux machine to try this on?

  • #4 Miguel Grinberg said 2018-05-03T01:02:29Z

    @Brian: oh, that's interesting, I did not realize that the rq package relied on the fork call. The easiest workaround is to run the rq worker under a Linux emulation layer, either Cygwin, Mingw64 or the WSL. I'm going to add a note to the article regarding this limitation.

  • #5 Rohan said 2018-05-07T03:24:26Z

    Hi Miguel, I can't get the exporting task to work on Heroku. (It is working at localhost:5000.) When I deploy the app and check the Heroku logs, I see: 2018-05-07T03:14:51.843826+00:00 heroku[worker.1]: Starting process with command `rq worker microblog-tasks` 2018-05-07T03:14:52.443967+00:00 heroku[worker.1]: State changed from starting to up 2018-05-07T03:14:53.564240+00:00 app[worker.1]: Error 111 connecting to localhost:6379. Connection refused. 2018-05-07T03:14:53.642015+00:00 heroku[worker.1]: State changed from up to crashed 2018-05-07T03:14:53.623531+00:00 heroku[worker.1]: Process exited with status 1 Certainly localhost shouldn't be appearing there. I have followed your instructions very carefully, so I don't know what's wrong. Please advise. Thank you.

  • #6 Miguel Grinberg said 2018-05-07T05:54:27Z

    @Rohan: Check the value of the REDIS_URL environment variable in your Heroku project. It seems it isn't set to point to a redis server (I assume you did set up a Redis instance on Heroku?).

  • #7 Rohan said 2018-05-08T02:32:15Z

    I did add on heroku-redis as you instructed. The command "heroku config:get REDIS_URL" results in: redis://h:p23ccaf9db749cf631b6b462553407ae0e99286e3dbb9fec28283b60b88730ff5@ec2-52-55-90-143.compute-1.amazonaws.com:45769 Is this what you have in mind? What else might be wrong? Thank you.

  • #8 Miguel Grinberg said 2018-05-08T06:12:51Z

    @Rohan: okay, so REDIS_URL is set, but the application appears to not be using it. The logs that you showed me indicate that the Redis service was attempted at localhost:6379, which is the default. So you need to debug how the REDIS_URL environment variable is imported into the application configuration, there seems to be something broken there.

  • #9 enkrates said 2018-05-08T08:46:33Z

    Hi Miguel, what's the different between this tutorial series and the ebook with the same title?

  • #10 Miguel Grinberg said 2018-05-08T18:16:48Z

    @enkrates: there is no difference. You can buy th ebook if you would like to read the tutorial on your ebook reader, or if you want to support my work on this blog. The content is exactly the same.

  • #11 Rohan said 2018-05-09T21:53:09Z

    My issue remains, despite my efforts. For instance, I tried what is proposed at https://devcenter.heroku.com/articles/python-rq, including adding on redistogo. I am still getting the following in my logs: 2018-05-09T21:40:45.548713+00:00 heroku[worker.1]: Starting process with command `rq worker microblog-tasks` 2018-05-09T21:40:46.031068+00:00 heroku[web.1]: Starting process with command `flask db upgrade; flask translate compile; gunicorn microblog:app` 2018-05-09T21:40:46.394681+00:00 heroku[worker.1]: State changed from starting to up 2018-05-09T21:40:46.394681+00:00 heroku[worker.1]: State changed from starting to up 2018-05-09T21:40:47.583502+00:00 app[worker.1]: Error 111 connecting to localhost:6379. Connection refused. 2018-05-09T21:40:47.676219+00:00 heroku[worker.1]: Process exited with status 1 2018-05-09T21:40:48.011667+00:00 heroku[worker.1]: State changed from up to crashed Have you been able to get the task of exporting posts to work at https://flask-microblog.herokuapp.com? I don't see the link to export my posts.

  • #12 Miguel Grinberg said 2018-05-10T00:37:39Z

    @Rohan: The version that I have deployed is the one that matches the code on the Heroku chapter, I don't want to confuse people by having a newer version there. Can you add some print statements near the place where the redis connection URL is passed to RQ? The logs show that the application is attempting to connect to locahost:6379, that makes me think something in your configuration isn't right.

  • #13 Niels-Christian F. Bagger said 2018-05-13T11:19:37Z

    Dear Miguel, Thank you so much for your blog. I have really learned a lot and I will surely buy your book and videos to support your great work. I have a question regarding localizing the email sent by the tasks. I tried to encapsulate the texts in the templates inside _(), but then I get a runtime error from the worker: "Working outside of request context". Do you have a suggestion on how to add the localization to the export_posts email templates? Kind regards, Niels-Christian

  • #14 Thijs Van Dongen said 2018-05-13T17:20:11Z

    Hi Miguel, Great tutorial - once again. I am running into some troubles with Cygwin - hopefully you can point me in the right direction. I have trouble to get the venv activated in order to create the RQ worker via "(venv) $ rq worker microblog-tasks". As I created the venv under windows I do not have the bin folder that you would normally have (when not on windows) to activate the venv. Also when I try activivating the venv via the windows bat-file I get the following errors. $ source venv/scripts/activate -bash: $'\r': command not found -bash: venv/scripts/activate: line 4: syntax error near unexpected token `$'{\r'' 'bash: venv/scripts/activate: line 4: `deactivate () { Do you know how I can activate the virtual environment from Cygwin that was created under windows?

  • #15 Miguel Grinberg said 2018-05-14T21:07:29Z

    @Niels: the export posts is interesting, because the email is sent from the RQ worker process, which has no knowledge of what the client's selection of language is. The working outside of the request context happens because Flask-Babel invokes the locale selector callback, and this function tries to use the request object to determine what language to use. What you can do to address this problem is to first, pass the language selected for the client to the background task. The language is known in the view function as g.locale. Then in the task, you can store the language back to the g object, and then the locale selector callback can return return g.locale if it determines that there is no request context active.

  • #16 Miguel Grinberg said 2018-05-14T21:19:21Z

    @Thijs: You seem to have windows line endings in your virtualenv files, which make bash fail. Make sure you have your disk mounted in binary mode, and then recreate the virtualenv.

  • #17 Chee said 2018-05-15T12:41:44Z

    Hi Miguel, I've a slightly different use case, that need to run a backend job once a day. Do u have any recommendation way to schedule the redis job?

  • #18 Miguel Grinberg said 2018-05-15T14:19:52Z

    @Chee: if this is a job that will be running every day at the same time I would not use RQ. Just write it as a standalone script and create a cron job for it. You can still create a Flask application instance in this script, and push an application context if the things that you do require it.

  • #19 Mubi said 2018-05-15T22:21:03Z

    Hi Michael, I'm working through this chapter, great tutorial so far. I get stuck on the part right before Progress Notifications, testing the background job functionality with redis-server running and the 'rq worker microblog-tasks' executing the 'export_posts' function. My rq worker log displays the following: 18:04:57 microblog-tasks: app.tasks.export_posts(4) (f4dafa3f-5476-4863-b602-97453dcebe26) user_id4 job_id: f4dafa3f-5476-4863-b602-97453dcebe26 task: None job_id: f4dafa3f-5476-4863-b602-97453dcebe26 task: None 18:04:58 AttributeError: 'NoneType' object has no attribute 'user' Traceback (most recent call last): File "./app/tasks.py", line 49, in export_posts _set_task_progress(0) File "./app/tasks.py", line 36, in _set_task_progress task.user.add_notification('task_progress', {'task_id': job.get_id(), AttributeError: 'NoneType' object has no attribute 'user' Seems like rq's get_current_job function is successfully getting the job back, but Task.query.get(job.get_id()) is not finding the job in the Task db objects, because when I print the task above the value is None. Any help appreciated!

  • #20 Miguel Grinberg said 2018-05-16T00:39:11Z

    @Mubi: are you writing the Task object from the Flask side? This is done in the launch_task() function. If you are, then I wonder if you are seeing a race condition, where the RQ worker starts working on the task before the Task object is written to the database.

  • #21 Mubi said 2018-05-16T01:39:23Z

    Hi Miguel, Yes, I am writing the Task object from the Flask side inside 'launch_task()' The weird thing is the job_id from the rq worker matches the printed task id above, but the Task object off the db using Task.query.get(job.get_id()) returns None. I tried putting a 'time.sleep(15)' before retrieving the Task in '_set_task_progress' before 'Task = Task.query.get(job.get_id())'. I just don't know why the Task isn't written to the db. Appreciate your help!

  • #22 Miguel Grinberg said 2018-05-16T04:47:30Z

    @Mubi: this happens with my own code, or do you have your own version? You may want to check out my code if you are not using that, just as a way to check if your code might have something wrong.

  • #23 Jubaer Hossain said 2018-05-25T13:58:38Z

    Hi Miguel, Absolutely brilliant tutorial! I'm having trouble dealing with http time out issues. My server runs an audio processing software. When I try to upload long audio file, it faces time out problem. I want to do the uploading in asynchronous mode using flask Restful. Could you please suggest me something. It would be a great help for me ! Thanks in Advance

  • #24 Miguel Grinberg said 2018-05-26T22:43:52Z

    @Jubaer: hard for me to help you without seeing your code and knowing how reliable your network is. You may want to consider running the file upload "by hand", by having the client send chunks of the file one after another, and retrying any that fail. The FileReader API available in most browsers these days can be used to implement your own file upload mechanism, which can be more flexible.

  • #25 Tho said 2018-05-29T03:24:02Z

    Hi Miguel, Thanks for your helpful tutorials. I have one question. My app uses Flask with Mongoengine. I wonder I can integrate the RQ into my app to make background-jobs or any advice for me? Appreciate your help!

Leave a Comment