2018-05-01T23:17:57Z

The Flask Mega-Tutorial Part XXII: Background Jobs

This is the twenty second installment of the Flask Mega-Tutorial series, in which I'm going to tell you how to create background jobs that run independently of the web server.

For your reference, below is a list of the articles in this series.

Note 1: If you are looking for the legacy version of this tutorial, it's here.

Note 2: If you would like to support my work on this blog, or just don't have patience to wait for weekly articles, I am offering the complete version of this tutorial packaged as an ebook or a set of videos. For more information, visit courses.miguelgrinberg.com.

This chapter is dedicated to the implementation of long or complex processes that need to run as part of the application. These processes cannot be executed synchronously in the context of a request because that would block the response to the client for the duration of the task. I briefly touched on this topic in Chapter 10, when I moved the sending of emails to background threads to prevent the client from having to wait during those 3-4 seconds that it takes to send an email. While using threads for emails is acceptable, this solution does not scale well when the processes in question are much longer. The accepted practice is to offload long tasks to a worker process, or more likely to a pool of them.

To justify the need for having long running tasks, I'm going to introduce an export feature to Microblog through which users will be able to request a data file with all their blog posts. When a user makes use of this option, the application is going to start an export task that will generate a JSON file with all the user's posts, and then send it to the user by email. All this activity is going to happen in a worker process, and while it happens the user will see a notification showing the percentage of completion.

The GitHub links for this chapter are: Browse, Zip, Diff.

Introduction to Task Queues

Task queues provide a convenient solution for the application to request the execution of a task by a worker process. Worker processes run independently of the application and can even be located on a different system. The communication between the application and the workers is done through a message queue. The application submits a job, and then monitors its progress by interacting with the queue. The following diagram shows a typical implementation:

Task Queue Diagram

The most popular task queue for Python is Celery. This is a fairly sophisticated package that has many options and supports several message queues. Another popular Python task queue is Redis Queue or just RQ, which sacrifices some flexibility, such as only supporting a Redis message queue, but in exchange it is much simpler to set up than Celery.

Both Celery and RQ are perfectly adequate to support background tasks in a Flask application, so my choice for this application is going to favor the simplicity of RQ. However, implementing the same functionality with Celery should be relatively easy. If you are interested in Celery more than RQ, you can read the Using Celery with Flask article that I have on my blog.

Using RQ

RQ is a standard Python package, that is installed with pip:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

As I mentioned earlier, the communication between the application and the RQ workers is going to be carried out in a Redis message queue, so you need to have a Redis server running. There are many options to get a Redis server installed and running, from one-click installers to downloading the source code and compiling it directly on your system. If you are using Windows, Microsoft maintains installers here. On Linux, you can likely get it as a package through your operating system's package manager. Mac OS X users can run brew install redis and then start the service manually with the redis-server command.

You will not need to interact with Redis at all outside of just ensuring that the service is running and accessible to RQ.

Note that RQ does not run on the Windows native Python interpreter. If you are using the Windows platform, you can only run RQ under Unix emulation. The two Unix emulation layers that I recommend to Windows users are Cygwin and the Windows Subsystem for Linux (WSL), and both are compatible with RQ.

Creating a Task

I'm going to show you how to run a simple task through RQ so that you familiarize with it. A task, is nothing more than a Python function. Here is an example task, that I'm going to put in a new app/tasks.py module:

app/tasks.py: Example background task.

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

This task takes a number of seconds as an argument, and then waits that amount of time, printing a counter once a second.

Running the RQ Worker

Now that the task is ready, a worker can be starter. This is done with the rq worker command:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

The worker process is now connected to Redis, and watching for any jobs that may be assigned to it on a queue named microblog-tasks. In cases where you want to have multiple workers to have more throughput, all you need to do is run more instances of rq worker, all connected to the same queue. Then when a job shows up in the queue, any of the available worker processes will pick it up. In a production environment you will probably want to have at least as many workers as available CPUs.

Executing Tasks

Now open a second terminal window and activate the virtual environment on it. I'm going to use a shell session to kick off the example() task in the worker:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

The Queue class from RQ represents the task queue as seen from the application side. The arguments it takes are the queue name, and a Redis connection object, which in this case I initialize with a default URL. If you have your Redis server running on a different host or port number, you will need to use a different URL.

The enqueue() method on the queue is used to add a job to the queue. The first argument is the name of the task you want to execute, given directly as a function object, or as an import string. I find the string option much more convenient, as that makes it unnecessary to import the function on the application's side. Any remaining arguments given to enqueue() are going to be passed to the function running in the worker.

As soon as you make the enqueue() call you are going to notice some activity on your first terminal window, the one running the RQ worker. You will see that the example() function is now running, and printing the counter once per second. At the same time, your other terminal is not blocked and you can continue evaluating expressions in the shell. In the example above, I called the job.get_id() method to obtain the unique identifier assigned to the task. Another interesting expression you can try with the job object is to check if the function on the worker has finished:

>>> job.is_finished
False

If you passed a 23 like I did in my example above, then the function is going to run for about 23 seconds. After that time, the job.is_finished expression will become True. Isn't this pretty cool? I really like the simplicity of RQ.

Once the function completes, the worker goes back to waiting for new jobs, so you can repeat the enqueue() call with different arguments if you want to experiment more. The data that is stored in the queue regarding a task will stay there for some time (500 seconds by default), but eventually will be removed. This is important, the task queue does not preserve a history of executed jobs.

Reporting Task Progress

The example task I have used above is unrealistically simple. Normally, for a long running task you will want some sort of progress information to be made available to the application, which in turn can show it to the user. RQ supports this by using the meta attribute of the job object. Let me rewrite the example() task to write progress reports:

app/tasks.py: Example background task with progress.

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

This new version of example() uses RQ's get_current_job() function to get a job instance, which is similar to the one returned to the application when it submits the task. The meta attribute of the job object is a dictionary where the task can write any custom data that it wants to communicate to the application. In this example, I'm writing a progress item that represents the percentage of completion of the task. Each time the progress is updated I call job.save_meta() to instruct RQ to write the data to Redis, where the application can find it.

On the application side (currently just a Python shell), I can run this task and then monitor progress as follows:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

As you can see above, on this side the meta attribute is available to read. The refresh() method needs to be invoked for the contents to be updated from Redis.

Database Representation of Tasks

For the example above it was enough to start a task and watch it run. For a web application things get a bit more complicated, because once one of these task is started as part of a request, that request is going to end, and all the context for that task is going to be lost. Because I want the application to keep track of what tasks each user is running, I need to use a database table to maintain some state. Below you can see the new Task model implementation:

app/models.py: Task model.

# ...
import redis
import rq

class User(UserMixin, db.Model):
    # ...
    tasks = db.relationship('Task', backref='user', lazy='dynamic')

# ...

class Task(db.Model):
    id = db.Column(db.String(36), primary_key=True)
    name = db.Column(db.String(128), index=True)
    description = db.Column(db.String(128))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    complete = db.Column(db.Boolean, default=False)

    def get_rq_job(self):
        try:
            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
            return None
        return rq_job

    def get_progress(self):
        job = self.get_rq_job()
        return job.meta.get('progress', 0) if job is not None else 100

An interesting difference between this model and the previous ones is that the id primary key field is a string, not an integer. This is because for this model, I'm not going to rely on the database's own primary key generation and instead I'm going to use the job identifiers generated by RQ.

The model is going to store the task's fully qualified name (as passed to RQ), a description for the task that is appropriate for showing to users, a relationship to the user that requested the task, and a boolean that indicates if the task completed or not. The purpose of the complete field is to separate tasks that ended from those that are actively running, as running tasks require special handling to show progress updates.

The get_rq_job() method is a helper method that loads the RQ Job instance, from a given task id, which I can get from the model. This is done with Job.fetch(), which loads the Job instance from the data that exists in Redis about it. The get_progress() method builds on top of get_rq_job() and returns the progress percentage for the task. This method has a couple of interesting assumptions. If the job id from the model does not exist in the RQ queue, that means that the job already finished and the data expired and was removed from the queue, so in that case the percentage returned is 100. On the other extreme, if the job exists, but there is no information associated with the meta attribute, then it is safe to assume that the job is scheduled to run, but did not get a chance to start yet, so in that situation a 0 is returned as progress.

To apply the changes to the database schema, a new migration needs to be generated, and then the database upgraded:

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

The new model can also be added to the shell context, to make it accessible in shell sessions without having to import it:

microblog.py: Add Task model to shell context.

from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task

app = create_app()
cli.register(app)

@app.shell_context_processor
def make_shell_context():
    return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
            'Notification': Notification, 'Task': Task}

Integrating RQ with the Flask Application

The connection URL for the Redis service needs to be added to the configuration:

class Config(object):
    # ...
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

As always, the Redis connection URL will be sourced from an environment variable, and if the variable isn't defined, a default URL that assumes the service is running on the same host and in the default port will be used.

The application factory function will be in charge of initializing Redis and RQ:

app/__init__.py: RQ integration.

# ...
from redis import Redis
import rq

# ...

def create_app(config_class=Config):
    # ...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

    # ...

The app.task_queue is going to be the queue where tasks are submitted. Having the queue attached to the application is convenient because anywhere in the application I can use current_app.task_queue to access it. To make it easy for any part of the application to submit or check on a task, I can create a few helper methods in the User model:

app/models.py: Task helper methods in the user model.

# ...

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        return Task.query.filter_by(user=self, complete=False).all()

    def get_task_in_progress(self, name):
        return Task.query.filter_by(name=name, user=self,
                                    complete=False).first()

The launch_task() method takes care of submitting a task to the RQ queue, along with adding it to the database. The name argument is the function name, as defined in app/tasks.py. When submitting to RQ, the function prepends app.tasks. to this name to build the fully qualified function name. The description argument is a friendly description of the task that can be presented to users. For the function that export the blog posts, I will set the name to export_posts and the description to Exporting posts.... The remaining arguments are positional and keyword arguments that will be passed to the task. The function begins by calling the queue's enqueue() method to submit the job. The job object that is returned contains the task id assigned by RQ, so I can use that to create a corresponding Task object in my database.

Note that launch_task() adds the new task object to the session, but it does not issue a commit. In general, it is best to operate on the database session in the higher level functions, as that allows you to combine several updates made by lower level functions in a single transaction. This is not a strict rule, and in fact, you are going to see an exception where a commit is issued in a child function later in this chapter.

The get_tasks_in_progress() method returns the complete list of functions that are outstanding for the user. You will see later that I use this method to include information about running tasks in the pages that are rendered to the user.

Finally, the get_task_in_progress() is a simpler version of the previous one that returns a specific task. I prevent users from starting two or more tasks of the same type concurrently, so before I launch a task, I can use this method to find out if a previous task is currently running.

Sending Emails from the RQ Task

This may seem like a distraction from the main topic, but I said above that when the background export task completes, an email is going to be sent to the user with a JSON file that contains all the posts. The email functionality that I built in Chapter 11 needs to be extended in two ways. First, I need to add support for file attachments, so that I can attach a JSON file. Second, the send_email() function always sends emails asynchronously, using a background thread. When I'm going to send an email from a background task, which is already asynchronous, having a second level background task based on a thread makes little sense, so I need to support both synchronous and asynchronous email sending.

Luckily, Flask-Mail supports attachments, so all I need to do is extend the send_email() function to take them in an additional argument, and then configure them in the Message object. And to optionally send the email in the foreground, I just need to add a boolean sync argument:

app/email.py: Send emails with attachments.

# ...

def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
            args=(current_app._get_current_object(), msg)).start()

The attach() method of the Message class accepts three arguments that define an attachment: the filename, the media type, and the actual file data. The filename is just the name that the recipient will see associated with the attachment, it does not need to be a real file. The media type defines what type of attachment is this, which helps email readers render it appropriately. For example, if you send image/png as the media type, an email reader will know that the attachment is an image, in which case it can show it as such. For the blog post data file I'm going to use the JSON format, which uses a application/json media type. The third and last argument is a string or byte sequence with the contents of the attachment.

To make it simple, the attachments argument to send_email() is going to be a list of tuples, and each tuple is going to have three elements which correspond to the three arguments of attach(). So for each element in this list, I need to send the tuple as arguments to attach(). In Python, if you have a list or tuple with arguments that you want to send to a function, you can use func(*args) to have that list expanded into the actual argument list, instead of having to use a more tedious syntax such as func(args[0], args[1], args[2]). So for example, if you have args = [1, 'foo'], the call will send two arguments, same as if you called func(1, 'foo'). Without the *, the call would have a single argument which would be the list.

As far as the synchronous sending of the email, what I needed to do is just revert back to calling mail.send(msg) directly when sync is True.

Task Helpers

While the example() task I used above was a simple standalone function, the function that exports blog posts is going to need some of the functionality I have in the application, like access to the database and the email sending function. Because this is going to run in a separate process, I need to initialize Flask-SQLAlchemy and Flask-Mail, which in turn need a Flask application instance from which to get their configuration. So I'm going to add a Flask application instance and application context at the top of the app/tasks.py module:

app/tasks.py: Create application and context.

from app import create_app

app = create_app()
app.app_context().push()

The application is created in this module because this is the only module that the RQ worker is going to import. When you use the flask command, the microblog.py module in the root directory creates the application, but the RQ worker knows nothing about that, so it needs to create its own application instance if the task functions need it. You have seen the app.app_context() method in a couple of places already, pushing a context makes the application be the "current" application instance, and this enables extensions such as Flask-SQLAlchemy to use current_app.config to obtain their configuration. Without the context, the current_app expression would return an error.

I then started thinking about how I was going to report progress while this function is running. In addition to passing progress information through the job.meta dictionary, I'd like to push notifications to the client, so that the completion percentage can be updated dynamically without the user having to refresh the page. For this I'm going to use the notification mechanisms I built in Chapter 21. The updates are going to work in a very similar way to the unread messages badge. When the server renders a template, it will include "static" progress information obtained from job.meta, but then once the page is on the client's browser, the notifications are going to dynamically update the percentage using notifications. Because of the notifications, updating the progress of a running task is going to be slightly more involved than how I did it in the previous example, so I'm going to create a wrapper function dedicated to updating progress:

app/tasks.py: Set task progress.

from rq import get_current_job
from app import db
from app.models import Task

# ...

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = Task.query.get(job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

The export task can call _set_task_progress() to record the progress percentage. The function first writes the percentage to the job.meta dictionary and saves it to Redis, then it loads the corresponding task object from the database and uses task.user to push a notification to the user that requested the task, using the existing add_notification() method. The notification is going to be named task_progress, and the data associated with it is going to be a dictionary with two items, the task identifier and the progress number. Later I will add JavaScript code to act on this new notification type.

The function checks if the progress indicates that the function has completed, and in that case also updates the complete attribute of the task object in the database. The database commit call ensures that the task and the notification object added by add_notification() are both saved immediately to the database. I needed to be very careful in how I designed the parent task to not make any database changes, since this commit call would write those as well.

Implementing the Export Task

Now all the pieces are in place for me to write the export function. The high level structure of this function is going to be as follows:

app/tasks.py: Export posts general structure.

def export_posts(user_id):
    try:
        # read user posts from database
        # send email with data to user
    except:
        # handle unexpected errors

Why wrap the whole task in a try/except block? The application code that exists in request handlers is protected against unexpected errors because Flask itself catches exceptions and then handles them observing any error handlers and logging configuration I have set up for the application. This function, however, is going to run in a separate process that is controlled by RQ, not Flask, so if any unexpected errors occur the task will abort, RQ will display the error to the console and then will go back to wait for new jobs. So basically, unless you are watching the output of the RQ worker or logging it to a file, you will never find out there was an error.

Let's start looking at the three sections indicated with comments above with the simplest one, which is the error handling at the end:

app/tasks.py: Export posts error handling.

import sys
# ...

def export_posts(user_id):
    try:
        # ...
    except:
        _set_task_progress(100)
        app.logger.error('Unhandled exception', exc_info=sys.exc_info())

Whenever an unexpected error occurs, I'm going to mark the task as finished by setting the progress to 100%, and then use the logger object from the Flask application to log the error, along with the stack trace, information which is provided by the sys.exc_info() call. The nice thing about using the Flask application logger to log errors here as well is that any logging mechanisms you have implemented for the Flask application will be observed. For example, in Chapter 7 I configured errors to be sent out to the administrator email address. Just by using app.logger I also get that behavior for these errors.

Next, I'm going to code the actual export, which simply issues a database query and walks through the results in a loop, accumulating them in a dictionary:

app/tasks.py: Read user posts from the database.

import time
from app.models import User, Post

# ...

def export_posts(user_id):
    try:
        user = User.query.get(user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = user.posts.count()
        for post in user.posts.order_by(Post.timestamp.asc()):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

        # send email with data to user
    except:
        # ...

For each post, the function is going to include a dictionary with two elements, the post body and the time the post was written. The time is going to be written in the ISO 8601 standard. The Python's datetime objects that I'm using do not store a timezone, so after I export the time in the ISO format I add the 'Z', which indicates UTC.

The code gets slightly complicated due to the need to keep track of progress. I maintain the counter i, and I need to issue an extra database query before I enter the loop for total_posts to have the number of posts. Using i and total_posts, each loop iteration can update the task progress with a number from 0 to 100.

You may have noticed that I also added a time.sleep(5) call in each loop iteration. The main reason I added the sleep is to make the export task last longer, and be able to see the progress go up even when the export covers just a handful of blog posts.

Below you can see the last part of the function, which sends an email to the user with all the information collected in data as an attachment:

app/tasks.py: Email posts to user.

import json
from flask import render_template
from app.email import send_email

# ...

def export_posts(user_id):
    try:
        # ...

        send_email('[Microblog] Your blog posts',
                sender=app.config['ADMINS'][0], recipients=[user.email],
                text_body=render_template('email/export_posts.txt', user=user),
                html_body=render_template('email/export_posts.html', user=user),
                attachments=[('posts.json', 'application/json',
                              json.dumps({'posts': data}, indent=4))],
                sync=True)
    except:
        # ...

This is simply a call to the send_email() function. The attachment is defined as a tuple with the three elements that are then passed to the attach() method of Flask-Mail's Message object. The third element in the tuple is the attachment contents, which are generated with Python's json.dumps() function.

There are a pair of new templates referenced here, which provide the contents of the email body in plain text and HTML form. Here is the text template:

app/templates/email/export_posts.txt: Export posts text email template.

Dear {{ user.username }},

Please find attached the archive of your posts that you requested.

Sincerely,

The Microblog Team

Here is the HTML version of the email:

app/templates/email/export_posts.html: Export posts HTML email template.

<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>

Export Functionality in the Application

All the core pieces to support the background export tasks are now in place. What remains is to hook up this functionality to the application, so that users can place requests for their posts to be emailed to them.

Below you can see a new export_posts view function:

app/main/routes.py: Export posts route and view function.

@bp.route('/export_posts')
@login_required
def export_posts():
    if current_user.get_task_in_progress('export_posts'):
        flash(_('An export task is currently in progress'))
    else:
        current_user.launch_task('export_posts', _('Exporting posts...'))
        db.session.commit()
    return redirect(url_for('main.user', username=current_user.username))

The function first checks if the user has an outstanding export task, and in that case just flashes a message. It really makes no sense to have two export tasks for the same user at the same time, so this is prevented. I can check for this condition using the get_task_in_progress() method I implemented earlier.

If the user isn't already running an export, then launch_task() is invoked to start a one. The first argument is the name of the function that will be passed to the RQ worker, prefixed with app.tasks.. The second argument is just a friendly text description that will be shown to the user. Both values are written to the Task object in the database. The function ends with a redirect to the user profile page.

Now I need to expose a link to this route that the user can access to request the export. I think the most appropriate place to do this is in the user profile page, where the link can only be shown when users view their own page, right below the "Edit your profile" link:

app/templates/user.html: Export link in user profile page.

                ...
                <p>
                    <a href="{{ url_for('main.edit_profile') }}">
                        {{ _('Edit your profile') }}
                    </a>
                </p>
                {% if not current_user.get_task_in_progress('export_posts') %}
                <p>
                    <a href="{{ url_for('main.export_posts') }}">
                        {{ _('Export your posts') }}
                    </a>
                </p>
                ...
                {% endif %}

This link is tied to a conditional, because I don't want it to appear when the user already has an export in progress.

At this point the background jobs should be functional, but without giving any feedback to the user. If you want to try this out, you can start the application and the RQ worker as follows:

  • Make sure you have Redis running
  • In a first terminal window, start one or more instances of the RQ worker. For this you have to use the command rq worker microblog-tasks
  • In a second terminal window, start the Flask application with flask run (remember to set FLASK_APP first)

Progress Notifications

To wrap up this feature I want to inform the user when a background task is running, including a percentage of completion. In looking through the Bootstrap component options, I decided to use an alert below the navigation bar for this. Alerts are these color horizontal bars that display information to the user. The blue alert boxes are what I'm using to render flashed messages. Now I'm going to add a green one to show progress status. Below you can see how that is going to look:

Progress Alert

app/templates/base.html: Export progress alert in base template.

...
{% block content %}
    <div class="container">
        {% if current_user.is_authenticated %}
        {% with tasks = current_user.get_tasks_in_progress() %}
        {% if tasks %}
            {% for task in tasks %}
            <div class="alert alert-success" role="alert">
                {{ task.description }}
                <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
            </div>
            {% endfor %}
        {% endif %}
        {% endwith %}
        {% endif %}
        ...
{% endblock %}
...

The method to render the task alerts is almost identical to the flashed messages. The outer conditional skips all the alert related markup when the user is not logged in. For logged in users, I get the currently in-progress task list by calling the get_tasks_in_progress() method I created earlier. In the current version of the application I will only get one result at the most, since I don't allow more than one active export at a time, but in the future I may want to support other types of tasks that can coexist, so writing this in a generic way could save me time later.

For each task I write an alert element to the page. The color of the alert is controlled with the second CSS style, which in this case is alert-success, while in the case of the flashed messages was alert-info. The Bootstrap documentation includes the details on the HTML structure for the alerts. The text of the alert includes the description field stored in the Task model, followed by the completion percentage.

The percentage is wrapped in a <span> element that has a id attribute. The reason for this is that I'm going to refresh the percentage from JavaScript when notifications are received. The id that I'm using for a given task is constructed as the task id with -progress appended at the end. When a notification arrives, it will contain the task id, so I can easily locate the correct <span> element to update with a selector for #<task.id>-progress.

If you try the application at this point, you are going to see "static" progress updates, each time you navigate to a new page. You will notice that after you start an export task, you can freely navigate to different pages of the application, and the state of the running task is always recalled.

To prepare for applying dynamic updates to the percentage <span> elements, I'm going to write a little helper function in the JavaScript side:

app/templates/base.html: Helper function to dynamically update task progress.

...
{% block scripts %}
    ...
    <script>
        ...
        function set_task_progress(task_id, progress) {
            $('#' + task_id + '-progress').text(progress);
        }
    </script>
    ...
{% endblock %}

This function takes a task id and a progress value, and uses jQuery to locate the <span> element for this task and write the new progress as its contents. There is really no need to verify if the element exists on the page, because jQuery will do nothing if no elements are located with the given selector.

The notifications are already arriving to the browser because the _set_task_progress() function in app/tasks.py calls add_notification() each time the progress is updated. If you are confused about how these notifications could be reaching the browser without me having to do anything, it's really because in Chapter 21 I was wise to implement the notifications feature in a completely generic way. Any notifications that are added through the add_notification() method will be seen by the browser when it periodically asks the server for notification updates.

But the JavaScript code that processes these notifications only recognizes those that have a unread_message_count name, and ignores the rest. What I need to do now is expand that function to also handle task_progress notifications by calling the set_task_progress() function I defined above. Here is an updated version of the loop that processes notifications from JavaScript:

app/templates/base.html: Notification handler.

                        for (var i = 0; i < notifications.length; i++) {
                            switch (notifications[i].name) {
                                case 'unread_message_count':
                                    set_message_count(notifications[i].data);
                                    break;
                                case 'task_progress':
                                    set_task_progress(
                                        notifications[i].data.task_id,
                                        notifications[i].data.progress);
                                    break;
                            }
                            since = notifications[i].timestamp;
                        }

Now that I need to handle two different notifications, I decided to replace the if statement that checked for the unread_message_count notification name with a switch statement that contains one section for each of the notifications I now need to support. If you are not very familiar with the "C" family of languages you may not have seen switch statements before. These provide a convenient syntax that replaces a long chain of if/elseif statements. This is nice because as I need to support more notifications, I can simply keep adding them as additional case blocks.

If you recall, the data that the RQ task attaches to the task_progress notification is a dictionary with two elements, task_id and progress, which are the two arguments that I need to use to invoke set_task_progress().

If you run the application now, the progress indicator in the green alert box is going to refresh every 10 seconds, as notifications are delivered to the client.

Because I have introduced new translatable strings in this chapter, the translation files need to be updated. If you are maintaining a non-English language file, you need to use Flask-Babel to refresh your translation files and then add new translations:

(venv) $ flask translate update

If you are using the Spanish translation, then I have done the translation work for you, so you can just extract the app/translations/es/LC_MESSAGES/messages.po files from the download package for this chapter and add it to your project.

After the translations are done, you have to compile the translation files:

(venv) $ flask translate compile

Deployment Considerations

To complete this chapter, I want to discuss how the deployment of the application changes. To support background tasks I have added two new components to the stack, a Redis server, and one or more RQ workers. Obviously these need to be included in your deployment strategy, so I'm going to briefly go over the different deployment options I covered in previous chapters and how they are affected by these changes.

Deployment on a Linux Server

If you are running your application on a Linux server, adding Redis should be as simple as installing this package from your operating system. For Ubuntu Linux, you have to run sudo apt-get install redis-server.

To run the RQ worker process, you can follow the "Setting Up Gunicorn and Supervisor" section in Chapter 17 to create a second Supervisor configuration in which you run rq worker microblog-tasks instead of gunicorn. If you want to run more than one worker (and you probably should for production), you can use Supervisor's numprocs directive to indicate how many instances you want to have running concurrently.

Deployment on Heroku

To deploy the application on Heroku you are going to need to add a Redis service to your account. This is similar to the process that I used to add the Postgres database. Redis also has a free tier, which can be added with the following command:

$ heroku addons:create heroku-redis:hobby-dev

The access URL for your new redis service is going to be added to your Heroku environment as a REDIS_URL variable, which is exactly what the application expects.

The free plan in Heroku allows one web dyno and one worker dyno, so you can host a single rq worker along with your application without incurring into any expenses. For this you will need to declare the worker in a separate line in your procfile:

web: flask db upgrade; flask translate compile; gunicorn microblog:app
worker: rq worker -u $REDIS_URL microblog-tasks

After you deploy with these changes, you can start the worker with the following command:

$ heroku ps:scale worker=1

Deployment on Docker

If you are deploying the application to Docker containers, then you first need to create a Redis container. For this you can use one of the official Redis images from the Docker registry:

$ docker run --name redis -d -p 6379:6379 redis:3-alpine

When you run your application you will need to link the redis container and set the REDIS_URL environment variable, similar to how the MySQL container is linked. Here is a complete command to start the application including a redis link:

$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    microblog:latest

Finally, you will need to run one or more containers for the RQ workers. Because the workers are based on the same code as the main application, you can use the same container image you use for your application, overriding the start up command so that the worker is started instead of the web application. Here is an example docker run command that starts a worker:

$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    --entrypoint venv/bin/rq \
    microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks

Overriding the default start up command of a Docker image is a bit tricky because the command needs to be given in two parts. The --entrypoint argument takes just the executable name, but the arguments (if any) need to be given after the image and tag, at the end of the command line. Note that rq needs to be given as venv/bin/rq so that it works without having the virtual environment activated.

75 comments

  • #26 Miguel Grinberg said 2018-05-29T06:11:00Z

    @Tho: I don't see why not. RQ does not have any database requirements, so there is no conflict with mongoengine.

  • #27 Tho said 2018-05-29T07:41:53Z

    Hi Miguel, Thanks for your comment and your tutorials. I always follow your website as well as buy your book to learn Flask even I use another database. You help me a lot in my app. Thanks again!

  • #28 Marc Holbrook said 2018-05-30T17:00:42Z

    Hi Miguel, Bought your first book and I am loving the additional chapters in your new book. (yet to buy) I have adapted your Notifications and Background Jobs for some long tasks I Have. Adding another task using all the same structure I get the following: 2018-05-30 17:41:48,141 DEBUG | models | root | ***FUNC::::::: app.models.load_user(user_id) 2018-05-30 17:41:53,211 INFO | _internal | werkzeug | 10.16.32.34 - - [30/May/2018 17:41:53] "GET /main/notifications?since=1527698253.322596 HTTP/1.1" 500 - Traceback (most recent call last): . .etc . File "/home/ema-gui/DMSGUI_RQ/app/main/views.py", line 165, in notifications return jsonify([{'name': n.name, 'data': n.get_data(), 'timestamp': n.timestamp} for n in notifications] etc... then: OperationalError: (raised as a result of Query-invoked autoflush; consider using a session.no_autoflush block if this flush is occurring prematurely) (sqlite3.OperationalError) database is locked [SQL: u'UPDATE user SET last_activity=? WHERE user.id = ?'] [parameters: ('2018-05-30 16:41:48.144460', 1)] (Background on this error at: http://sqlalche.me/e/e3q8) At this stage, I can confirm that the task starts in RQ and eventually completes. The Task table is updated for each progress notification. The notifications url calls the notifications function but the query is not returned. This code works in the same application but with a different procedure. Any thoughts? Thanks , Marc

  • #29 Miguel Grinberg said 2018-05-30T18:41:03Z

    @Marc: the important error is "database is locked", everything else that happens comes from that problem. I suspect your long task keeps a database session open for a long time, which prevents other processes to make changes to the database.

  • #30 Cédric BRINER said 2018-06-06T12:10:16Z

    Hi Miguel, Let's imagine that I have an api entry such as "GET /shutdown-virtual-machine/<machine_id>" and that I would like to make this call within the virtual machine that is going to be shutdown by using "curl HOST/shutdown-virtual-machine/<machine_id>" The code of the API will look like @api.route("/shutdown-virtual-machine/<int:machine_id>", methods = ["GET]") def shutdown(machine_id): mylib.shutdown(machine_id) return {"status": "ok"} The problem is that after the function "mylib.shutdown(machine_id)" returns, the OS that execute the "curl" command will be powered off. So curl, will not have the time/ability to display the response. Can I do something like: @api.route("/shutdown-virtual-machine/<int:machine_id>", methods = ["GET]") def shutdown(machine_id): return_value {"status": "ok"} sleep 1 mylib.shutdown(machine_id) return What could be the "good" way of doing so ? cED P.-S. You made an amazing job when promoting/explaining/popularizing flask. Many thanks !

  • #31 Miguel Grinberg said 2018-06-06T22:54:06Z

    @Cédric: very interesting question. What I would do is to schedule the shutdown to happen in, for example, one minute. That gives your route plenty of time to return a response. You can do this with a background thread, for example: def shutdown_machine(machine_id, delay=60): time.sleep(delay) mylib.shutdown(machine_id) @api.route("/shutdown-virtual-machine/<int:machine_id>", methods = ["DELETE]") def shutdown(machine_id): Thread(target=shutdown_machine, args=(machine_id,)).start() return "ok" Also want to suggest you don't use GET for destructive actions. The GET method is used to read information, for shutting down a server I would go with DELETE.

  • #32 Vivek said 2018-06-24T16:10:33Z

    Thanks for the superb article as usual Miguel, Does RQ support rate limiting by any chance? I am planning to fetch data from several external APIs that have a rate limit on them. Can I also run websockets inside RQ workers if I had to say stream data instead of polling them from external APIs Thank you for the reply in advance

  • #33 Miguel Grinberg said 2018-06-25T04:16:48Z

    @Vivek: I don't understand what you mean by rate limiting in this context? Do you want to rate limit the starting of new jobs? I'm not sure that is necessary, jobs go on a queue, and they start whenever there is an RQ worker that is free. Regarding WebSocket, that is also a strange solution, because the RQ workers are not web servers, they are just workers that are supposed to take on relatively short tasks offloaded by the main server. I would handle the WebSocket stuff in the main server, or in a separate server if you prefer to keep HTTP and WebSocket on different apps.

  • #34 Elisabeth said 2018-06-27T02:19:33Z

    Dear Miguel, Thank you so much for the tutorial, it's been great to get help me get my own app running although I feel I still have a lot to learn. I need to have a progress bar for the long running task set up in rq so your solution seems great, however my task modifies the database and I can't have those modifications committed. I need the database to go back to its initial state after the the task is done so I can't commit the notifications. My app gets all the inputs from various forms and puts them in the database. Then a submit runs a task based on all the data entered. The task accesses the database and modifies it (no commits), and returns a result. It doesn't need (or should) allow the client to access other pages while the task is running. The task result was just rendered with html but it takes too long and Heroku times out so it is forcing me use an asynchronous task so I'm using rq now . When I start the task I commit it to the database so I can have a decorator to check if a task is running on all routes and that works, but I can't have have further commits. My problem is I can't get a progress bar working as I am not committing notifications and I don't know how to get the redis queue to communicate with the javascript on the client html page to update the progress. Mind you I wasn't able to get a progress bar working (a real one, that actually got progress from the task) before using rq, so I am definitely missing something on how to communicate back to the client page to update it with javascript. I would appreciate any help you can give me

  • #35 Miguel Grinberg said 2018-06-27T15:10:55Z

    @Elisabeth: what do you expect will happen when you make no commits for all the changes you are making to the database? If you don't commit your changes, then the changes are not in the database, they are just in memory. Nobody will be able to see those changes. Maybe I don't understand what you are trying to do, but I'm guessing you want those changes to be accessible? Why would you make changes if not?

  • #36 Elisabeth said 2018-06-28T13:54:37Z

    Dear Miguel I finally figured it out. To avoid commits, I just don't use the notification system at all and just make a progress bar using the job.meta['progress'] directly. Thanks for the tutorial it has been excellent for me to learn flask, and now I really need to learn javascript because although I got it working, it was quite a struggle. Elisabeth

  • #37 Fred said 2018-06-28T14:55:17Z

    Hi, you explain task with a certain progress. I understand that you have a queue and once one task is finished, the next one is started. However, sometimes you would like to have tasks that wake up every 10s, look if something changed and go to sleep again. Is something like this possible within redis ?

  • #38 Miguel Grinberg said 2018-06-28T15:15:54Z

    @Fred: what would be the point of using Redis or Redis Queue in the case of a periodic task? You can implement that with a cron job, for example, or a background thread or process.

  • #39 Dan said 2018-07-09T06:09:31Z

    Hello Miguel, thanks very much for this great tutorial. When i attempted to start up my rq worker with [rq worker microblog-tasks] I got a strange error: Traceback (most recent call last): File "/Users/Danesh/Projects/python_projects/blue-microblog/venv/bin/rq", line 7, in <module> from rq.cli import main File "/Users/Me/Projects/python_projects/microblog/venv/lib/python3.7/site-packages/rq/__init__.py", line 9, in <module> from .queue import get_failed_queue, Queue File "/Users/Me/Projects/python_projects/microblog/venv/lib/python3.7/site-packages/rq/queue.py", line 61 async=True, job_class=None): ^ SyntaxError: invalid syntax. So it seems the "async" keyword cannot be used as an argument name in Python 3.7 source: (https://github.com/pypa/pipenv/issues/956). I solved the issue by going into the queue.py module an changing the async keyword to async_. Do you think that this is a one off error to do with my particular build or should i report it somewhere? Thanks, Dan

  • #40 Miguel Grinberg said 2018-07-09T16:40:11Z

    @Dan: this appears to be a bug in RQ, which only shows with Python 3.7. It has been reported already: https://github.com/rq/rq/issues/972.

  • #41 Toan Nguyen said 2018-07-10T10:15:48Z

    Hi Miguel, Many thanks for your great tutorial as usual. I've managed to integrate rq into my Flask App in which I launch a 3-5 minute-long simulation task, and then continuously posting to another view in the app to update the task's progress via ajax calls. Everything works well on my development computer. However, I run into a problem when I deploy the app to Ubuntu server on Google Compute Engine (it runs via apache + mod_wsgi.) When I ssh onto the server, and start the worker with "rq worker worker-name", things run as I expected. However, if my ssh session ends, the worker still take jobs (since I can see the job id,) but the added job wouldn't run (the progress not updated.) If I ssh onto the machine and run "rq worker worker-name" again, then the job will start running as usual. My question is: is there a way to keep the worker running without keeping a ssh connection with the server? Your advice would be of great help to me, and hopefully to many others as well. Thanks and best regards,

  • #42 Miguel Grinberg said 2018-07-11T04:56:09Z

    @Toan: you need to start the worker as a service, as shown in the deployment chapters in this tutorial. You can use Supervisor if you like, or systemd is another good choice.

  • #43 Toan Nguyen said 2018-07-12T10:50:56Z

    Dear Miguel, Many thanks for pointing me to your excellent tutorial on deployment. I managed to make it work with gunicorn and supervisor. By the way, it also solves my problems with Flask-APScheduler. The cron jobs in APScheduler only trigger when there is an instance of the app running. Thanks again. Toan

  • #44 Daniel Ferszt said 2018-07-16T16:32:23Z

    Hi Miguel, Thanks for this tutorial, it's the best I've seen so far. I have a question regarding the redis workers... is there a way to rise an event or something when the task is finished without constantly polling for the progress?

  • #45 Miguel Grinberg said 2018-07-16T17:50:06Z

    @Daniel: RQ does not have a notification mechanism, but you can build your own if that is important for your application. You can either do polling in a background thread, or if you want something more efficient, you can use a interprocess locking mechanism, maybe a lock file.

  • #46 David Hagan said 2018-07-27T21:12:38Z

    When running RQ in production, what is a guideline for choosing the number of concurrent worker instances to run? Is it just a tradeoff on resources?

  • #47 Miguel Grinberg said 2018-07-30T20:38:40Z

    @David: Typically you'd want at least one per CPU core. If your tasks are not CPU intensive, you can have more.

  • #48 a137x said 2018-08-04T16:54:44Z

    @Rohan. Make sure that you start your worker with (set in procfile or directly in heroku): rq worker -u $REDIS_URL microblog-tasks

  • #49 John Lopez said 2018-10-02T16:09:14Z

    Hi Miguel - great tutorial. Quick question for you: I'm using Flask-Dance to manage an OAuth token for a user. In a redis task, I want to perform bulk API calls. Do you have a recommendation on how to best pass the OAuth session to redis? Thanks!

  • #50 Miguel Grinberg said 2018-10-02T22:08:33Z

    @John: Not familiar with Flask-Dance, but this "oauth session" that you mention is an application thing, in reality OAuth produces a token that must be stored inside this session object. If you can extract the token, then you can save that to Redis, and then reconstruct the session object with it when you need to make a call.

Leave a Comment