2018-05-01T23:17:57Z

The Flask Mega-Tutorial Part XXII: Background Jobs

This is the twenty second installment of the Flask Mega-Tutorial series, in which I'm going to tell you how to create background jobs that run independently of the web server.

For your reference, below is a list of the articles in this series.

Note 1: If you are looking for the legacy version of this tutorial, it's here.

Note 2: If you would like to support my work on this blog, or just don't have patience to wait for weekly articles, I am offering the complete version of this tutorial packaged as an ebook or a set of videos. For more information, visit courses.miguelgrinberg.com.

This chapter is dedicated to the implementation of long or complex processes that need to run as part of the application. These processes cannot be executed synchronously in the context of a request because that would block the response to the client for the duration of the task. I briefly touched on this topic in Chapter 10, when I moved the sending of emails to background threads to prevent the client from having to wait during those 3-4 seconds that it takes to send an email. While using threads for emails is acceptable, this solution does not scale well when the processes in question are much longer. The accepted practice is to offload long tasks to a worker process, or more likely to a pool of them.

To justify the need for having long running tasks, I'm going to introduce an export feature to Microblog through which users will be able to request a data file with all their blog posts. When a user makes use of this option, the application is going to start an export task that will generate a JSON file with all the user's posts, and then send it to the user by email. All this activity is going to happen in a worker process, and while it happens the user will see a notification showing the percentage of completion.

The GitHub links for this chapter are: Browse, Zip, Diff.

Introduction to Task Queues

Task queues provide a convenient solution for the application to request the execution of a task by a worker process. Worker processes run independently of the application and can even be located on a different system. The communication between the application and the workers is done through a message queue. The application submits a job, and then monitors its progress by interacting with the queue. The following diagram shows a typical implementation:

Task Queue Diagram

The most popular task queue for Python is Celery. This is a fairly sophisticated package that has many options and supports several message queues. Another popular Python task queue is Redis Queue or just RQ, which sacrifices some flexibility, such as only supporting a Redis message queue, but in exchange it is much simpler to set up than Celery.

Both Celery and RQ are perfectly adequate to support background tasks in a Flask application, so my choice for this application is going to favor the simplicity of RQ. However, implementing the same functionality with Celery should be relatively easy. If you are interested in Celery more than RQ, you can read the Using Celery with Flask article that I have on my blog.

Using RQ

RQ is a standard Python package, that is installed with pip:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

As I mentioned earlier, the communication between the application and the RQ workers is going to be carried out in a Redis message queue, so you need to have a Redis server running. There are many options to get a Redis server installed and running, from one-click installers to downloading the source code and compiling it directly on your system. If you are using Windows, Microsoft maintains installers here. On Linux, you can likely get it as a package through your operating system's package manager. Mac OS X users can run brew install redis and then start the service manually with the redis-server command.

You will not need to interact with Redis at all outside of just ensuring that the service is running and accessible to RQ.

Note that RQ does not run on the Windows native Python interpreter. If you are using the Windows platform, you can only run RQ under Unix emulation. The two Unix emulation layers that I recommend to Windows users are Cygwin and the Windows Subsystem for Linux (WSL), and both are compatible with RQ.

Creating a Task

I'm going to show you how to run a simple task through RQ so that you familiarize with it. A task, is nothing more than a Python function. Here is an example task, that I'm going to put in a new app/tasks.py module:

app/tasks.py: Example background task.

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

This task takes a number of seconds as an argument, and then waits that amount of time, printing a counter once a second.

Running the RQ Worker

Now that the task is ready, a worker can be started. This is done with the rq worker command:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

The worker process is now connected to Redis, and watching for any jobs that may be assigned to it on a queue named microblog-tasks. In cases where you want to have multiple workers to have more throughput, all you need to do is run more instances of rq worker, all connected to the same queue. Then when a job shows up in the queue, any of the available worker processes will pick it up. In a production environment you will probably want to have at least as many workers as available CPUs.

Executing Tasks

Now open a second terminal window and activate the virtual environment on it. I'm going to use a shell session to kick off the example() task in the worker:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

The Queue class from RQ represents the task queue as seen from the application side. The arguments it takes are the queue name, and a Redis connection object, which in this case I initialize with a default URL. If you have your Redis server running on a different host or port number, you will need to use a different URL.

The enqueue() method on the queue is used to add a job to the queue. The first argument is the name of the task you want to execute, given directly as a function object, or as an import string. I find the string option much more convenient, as that makes it unnecessary to import the function on the application's side. Any remaining arguments given to enqueue() are going to be passed to the function running in the worker.

As soon as you make the enqueue() call you are going to notice some activity on your first terminal window, the one running the RQ worker. You will see that the example() function is now running, and printing the counter once per second. At the same time, your other terminal is not blocked and you can continue evaluating expressions in the shell. In the example above, I called the job.get_id() method to obtain the unique identifier assigned to the task. Another interesting expression you can try with the job object is to check if the function on the worker has finished:

>>> job.is_finished
False

If you passed a 23 like I did in my example above, then the function is going to run for about 23 seconds. After that time, the job.is_finished expression will become True. Isn't this pretty cool? I really like the simplicity of RQ.

Once the function completes, the worker goes back to waiting for new jobs, so you can repeat the enqueue() call with different arguments if you want to experiment more. The data that is stored in the queue regarding a task will stay there for some time (500 seconds by default), but eventually will be removed. This is important, the task queue does not preserve a history of executed jobs.

Reporting Task Progress

The example task I have used above is unrealistically simple. Normally, for a long running task you will want some sort of progress information to be made available to the application, which in turn can show it to the user. RQ supports this by using the meta attribute of the job object. Let me rewrite the example() task to write progress reports:

app/tasks.py: Example background task with progress.

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

This new version of example() uses RQ's get_current_job() function to get a job instance, which is similar to the one returned to the application when it submits the task. The meta attribute of the job object is a dictionary where the task can write any custom data that it wants to communicate to the application. In this example, I'm writing a progress item that represents the percentage of completion of the task. Each time the progress is updated I call job.save_meta() to instruct RQ to write the data to Redis, where the application can find it.

On the application side (currently just a Python shell), I can run this task and then monitor progress as follows:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

As you can see above, on this side the meta attribute is available to read. The refresh() method needs to be invoked for the contents to be updated from Redis.

Database Representation of Tasks

For the example above it was enough to start a task and watch it run. For a web application things get a bit more complicated, because once one of these task is started as part of a request, that request is going to end, and all the context for that task is going to be lost. Because I want the application to keep track of what tasks each user is running, I need to use a database table to maintain some state. Below you can see the new Task model implementation:

app/models.py: Task model.

# ...
import redis
import rq

class User(UserMixin, db.Model):
    # ...
    tasks = db.relationship('Task', backref='user', lazy='dynamic')

# ...

class Task(db.Model):
    id = db.Column(db.String(36), primary_key=True)
    name = db.Column(db.String(128), index=True)
    description = db.Column(db.String(128))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    complete = db.Column(db.Boolean, default=False)

    def get_rq_job(self):
        try:
            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
            return None
        return rq_job

    def get_progress(self):
        job = self.get_rq_job()
        return job.meta.get('progress', 0) if job is not None else 100

An interesting difference between this model and the previous ones is that the id primary key field is a string, not an integer. This is because for this model, I'm not going to rely on the database's own primary key generation and instead I'm going to use the job identifiers generated by RQ.

The model is going to store the task's fully qualified name (as passed to RQ), a description for the task that is appropriate for showing to users, a relationship to the user that requested the task, and a boolean that indicates if the task completed or not. The purpose of the complete field is to separate tasks that ended from those that are actively running, as running tasks require special handling to show progress updates.

The get_rq_job() method is a helper method that loads the RQ Job instance, from a given task id, which I can get from the model. This is done with Job.fetch(), which loads the Job instance from the data that exists in Redis about it. The get_progress() method builds on top of get_rq_job() and returns the progress percentage for the task. This method has a couple of interesting assumptions. If the job id from the model does not exist in the RQ queue, that means that the job already finished and the data expired and was removed from the queue, so in that case the percentage returned is 100. On the other extreme, if the job exists, but there is no information associated with the meta attribute, then it is safe to assume that the job is scheduled to run, but did not get a chance to start yet, so in that situation a 0 is returned as progress.

To apply the changes to the database schema, a new migration needs to be generated, and then the database upgraded:

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

The new model can also be added to the shell context, to make it accessible in shell sessions without having to import it:

microblog.py: Add Task model to shell context.

from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task

app = create_app()
cli.register(app)

@app.shell_context_processor
def make_shell_context():
    return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
            'Notification': Notification, 'Task': Task}

Integrating RQ with the Flask Application

The connection URL for the Redis service needs to be added to the configuration:

class Config(object):
    # ...
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

As always, the Redis connection URL will be sourced from an environment variable, and if the variable isn't defined, a default URL that assumes the service is running on the same host and in the default port will be used.

The application factory function will be in charge of initializing Redis and RQ:

app/__init__.py: RQ integration.

# ...
from redis import Redis
import rq

# ...

def create_app(config_class=Config):
    # ...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

    # ...

The app.task_queue is going to be the queue where tasks are submitted. Having the queue attached to the application is convenient because anywhere in the application I can use current_app.task_queue to access it. To make it easy for any part of the application to submit or check on a task, I can create a few helper methods in the User model:

app/models.py: Task helper methods in the user model.

# ...

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        return Task.query.filter_by(user=self, complete=False).all()

    def get_task_in_progress(self, name):
        return Task.query.filter_by(name=name, user=self,
                                    complete=False).first()

The launch_task() method takes care of submitting a task to the RQ queue, along with adding it to the database. The name argument is the function name, as defined in app/tasks.py. When submitting to RQ, the function prepends app.tasks. to this name to build the fully qualified function name. The description argument is a friendly description of the task that can be presented to users. For the function that export the blog posts, I will set the name to export_posts and the description to Exporting posts.... The remaining arguments are positional and keyword arguments that will be passed to the task. The function begins by calling the queue's enqueue() method to submit the job. The job object that is returned contains the task id assigned by RQ, so I can use that to create a corresponding Task object in my database.

Note that launch_task() adds the new task object to the session, but it does not issue a commit. In general, it is best to operate on the database session in the higher level functions, as that allows you to combine several updates made by lower level functions in a single transaction. This is not a strict rule, and in fact, you are going to see an exception where a commit is issued in a child function later in this chapter.

The get_tasks_in_progress() method returns the complete list of functions that are outstanding for the user. You will see later that I use this method to include information about running tasks in the pages that are rendered to the user.

Finally, the get_task_in_progress() is a simpler version of the previous one that returns a specific task. I prevent users from starting two or more tasks of the same type concurrently, so before I launch a task, I can use this method to find out if a previous task is currently running.

Sending Emails from the RQ Task

This may seem like a distraction from the main topic, but I said above that when the background export task completes, an email is going to be sent to the user with a JSON file that contains all the posts. The email functionality that I built in Chapter 11 needs to be extended in two ways. First, I need to add support for file attachments, so that I can attach a JSON file. Second, the send_email() function always sends emails asynchronously, using a background thread. When I'm going to send an email from a background task, which is already asynchronous, having a second level background task based on a thread makes little sense, so I need to support both synchronous and asynchronous email sending.

Luckily, Flask-Mail supports attachments, so all I need to do is extend the send_email() function to take them in an additional argument, and then configure them in the Message object. And to optionally send the email in the foreground, I just need to add a boolean sync argument:

app/email.py: Send emails with attachments.

# ...

def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
            args=(current_app._get_current_object(), msg)).start()

The attach() method of the Message class accepts three arguments that define an attachment: the filename, the media type, and the actual file data. The filename is just the name that the recipient will see associated with the attachment, it does not need to be a real file. The media type defines what type of attachment is this, which helps email readers render it appropriately. For example, if you send image/png as the media type, an email reader will know that the attachment is an image, in which case it can show it as such. For the blog post data file I'm going to use the JSON format, which uses a application/json media type. The third and last argument is a string or byte sequence with the contents of the attachment.

To make it simple, the attachments argument to send_email() is going to be a list of tuples, and each tuple is going to have three elements which correspond to the three arguments of attach(). So for each element in this list, I need to send the tuple as arguments to attach(). In Python, if you have a list or tuple with arguments that you want to send to a function, you can use func(*args) to have that list expanded into the actual argument list, instead of having to use a more tedious syntax such as func(args[0], args[1], args[2]). So for example, if you have args = [1, 'foo'], the call will send two arguments, same as if you called func(1, 'foo'). Without the *, the call would have a single argument which would be the list.

As far as the synchronous sending of the email, what I needed to do is just revert back to calling mail.send(msg) directly when sync is True.

Task Helpers

While the example() task I used above was a simple standalone function, the function that exports blog posts is going to need some of the functionality I have in the application, like access to the database and the email sending function. Because this is going to run in a separate process, I need to initialize Flask-SQLAlchemy and Flask-Mail, which in turn need a Flask application instance from which to get their configuration. So I'm going to add a Flask application instance and application context at the top of the app/tasks.py module:

app/tasks.py: Create application and context.

from app import create_app

app = create_app()
app.app_context().push()

The application is created in this module because this is the only module that the RQ worker is going to import. When you use the flask command, the microblog.py module in the root directory creates the application, but the RQ worker knows nothing about that, so it needs to create its own application instance if the task functions need it. You have seen the app.app_context() method in a couple of places already, pushing a context makes the application be the "current" application instance, and this enables extensions such as Flask-SQLAlchemy to use current_app.config to obtain their configuration. Without the context, the current_app expression would return an error.

I then started thinking about how I was going to report progress while this function is running. In addition to passing progress information through the job.meta dictionary, I'd like to push notifications to the client, so that the completion percentage can be updated dynamically without the user having to refresh the page. For this I'm going to use the notification mechanisms I built in Chapter 21. The updates are going to work in a very similar way to the unread messages badge. When the server renders a template, it will include "static" progress information obtained from job.meta, but then once the page is on the client's browser, the notifications are going to dynamically update the percentage using notifications. Because of the notifications, updating the progress of a running task is going to be slightly more involved than how I did it in the previous example, so I'm going to create a wrapper function dedicated to updating progress:

app/tasks.py: Set task progress.

from rq import get_current_job
from app import db
from app.models import Task

# ...

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = Task.query.get(job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

The export task can call _set_task_progress() to record the progress percentage. The function first writes the percentage to the job.meta dictionary and saves it to Redis, then it loads the corresponding task object from the database and uses task.user to push a notification to the user that requested the task, using the existing add_notification() method. The notification is going to be named task_progress, and the data associated with it is going to be a dictionary with two items, the task identifier and the progress number. Later I will add JavaScript code to act on this new notification type.

The function checks if the progress indicates that the function has completed, and in that case also updates the complete attribute of the task object in the database. The database commit call ensures that the task and the notification object added by add_notification() are both saved immediately to the database. I needed to be very careful in how I designed the parent task to not make any database changes, since this commit call would write those as well.

Implementing the Export Task

Now all the pieces are in place for me to write the export function. The high level structure of this function is going to be as follows:

app/tasks.py: Export posts general structure.

def export_posts(user_id):
    try:
        # read user posts from database
        # send email with data to user
    except:
        # handle unexpected errors
    finally:
        # handle clean up

Why wrap the whole task in a try/except block? The application code that exists in request handlers is protected against unexpected errors because Flask itself catches exceptions and then handles them observing any error handlers and logging configuration I have set up for the application. This function, however, is going to run in a separate process that is controlled by RQ, not Flask, so if any unexpected errors occur the task will abort, RQ will display the error to the console and then will go back to wait for new jobs. So basically, unless you are watching the output of the RQ worker or logging it to a file, you will never find out there was an error.

Let's start looking at the sections indicated with comments above with the simplest ones, which are the error handling and clean up at the end:

app/tasks.py: Export posts error handling.

import sys
# ...

def export_posts(user_id):
    try:
        # ...
    except:
        app.logger.error('Unhandled exception', exc_info=sys.exc_info())
    finally:
        _set_task_progress(100)

Whenever an unexpected error occurs, I'm going to use the logger object from the Flask application to log the error, along with the stack trace, information which is provided by the sys.exc_info() call. The nice thing about using the Flask application logger to log errors here as well is that any logging mechanisms you have implemented for the Flask application will be observed. For example, in Chapter 7 I configured errors to be sent out to the administrator email address. Just by using app.logger I also get that behavior for these errors. In the finally clause, which will run both for errored and successful runs, I mark the task as finished by setting the progress to 100%.

Next, I'm going to code the actual export, which simply issues a database query and walks through the results in a loop, accumulating them in a dictionary:

app/tasks.py: Read user posts from the database.

import time
from app.models import User, Post

# ...

def export_posts(user_id):
    try:
        user = User.query.get(user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = user.posts.count()
        for post in user.posts.order_by(Post.timestamp.asc()):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

        # send email with data to user
    except:
        # ...
    finally:
        # ...

For each post, the function is going to include a dictionary with two elements, the post body and the time the post was written. The time is going to be written in the ISO 8601 standard. The Python's datetime objects that I'm using do not store a timezone, so after I export the time in the ISO format I add the 'Z', which indicates UTC.

The code gets slightly complicated due to the need to keep track of progress. I maintain the counter i, and I need to issue an extra database query before I enter the loop for total_posts to have the number of posts. Using i and total_posts, each loop iteration can update the task progress with a number from 0 to 100.

You may have noticed that I also added a time.sleep(5) call in each loop iteration. The main reason I added the sleep is to make the export task last longer, and be able to see the progress go up even when the export covers just a handful of blog posts.

Below you can see the last part of the function, which sends an email to the user with all the information collected in data as an attachment:

app/tasks.py: Email posts to user.

import json
from flask import render_template
from app.email import send_email

# ...

def export_posts(user_id):
    try:
        # ...

        send_email('[Microblog] Your blog posts',
                sender=app.config['ADMINS'][0], recipients=[user.email],
                text_body=render_template('email/export_posts.txt', user=user),
                html_body=render_template('email/export_posts.html', user=user),
                attachments=[('posts.json', 'application/json',
                              json.dumps({'posts': data}, indent=4))],
                sync=True)
    except:
        # ...
    finally:
        # ...

This is simply a call to the send_email() function. The attachment is defined as a tuple with the three elements that are then passed to the attach() method of Flask-Mail's Message object. The third element in the tuple is the attachment contents, which are generated with Python's json.dumps() function.

There are a pair of new templates referenced here, which provide the contents of the email body in plain text and HTML form. Here is the text template:

app/templates/email/export_posts.txt: Export posts text email template.

Dear {{ user.username }},

Please find attached the archive of your posts that you requested.

Sincerely,

The Microblog Team

Here is the HTML version of the email:

app/templates/email/export_posts.html: Export posts HTML email template.

<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>

Export Functionality in the Application

All the core pieces to support the background export tasks are now in place. What remains is to hook up this functionality to the application, so that users can place requests for their posts to be emailed to them.

Below you can see a new export_posts view function:

app/main/routes.py: Export posts route and view function.

@bp.route('/export_posts')
@login_required
def export_posts():
    if current_user.get_task_in_progress('export_posts'):
        flash(_('An export task is currently in progress'))
    else:
        current_user.launch_task('export_posts', _('Exporting posts...'))
        db.session.commit()
    return redirect(url_for('main.user', username=current_user.username))

The function first checks if the user has an outstanding export task, and in that case just flashes a message. It really makes no sense to have two export tasks for the same user at the same time, so this is prevented. I can check for this condition using the get_task_in_progress() method I implemented earlier.

If the user isn't already running an export, then launch_task() is invoked to start a one. The first argument is the name of the function that will be passed to the RQ worker, prefixed with app.tasks.. The second argument is just a friendly text description that will be shown to the user. Both values are written to the Task object in the database. The function ends with a redirect to the user profile page.

Now I need to expose a link to this route that the user can access to request the export. I think the most appropriate place to do this is in the user profile page, where the link can only be shown when users view their own page, right below the "Edit your profile" link:

app/templates/user.html: Export link in user profile page.

                ...
                <p>
                    <a href="{{ url_for('main.edit_profile') }}">
                        {{ _('Edit your profile') }}
                    </a>
                </p>
                {% if not current_user.get_task_in_progress('export_posts') %}
                <p>
                    <a href="{{ url_for('main.export_posts') }}">
                        {{ _('Export your posts') }}
                    </a>
                </p>
                ...
                {% endif %}

This link is tied to a conditional, because I don't want it to appear when the user already has an export in progress.

At this point the background jobs should be functional, but without giving any feedback to the user. If you want to try this out, you can start the application and the RQ worker as follows:

  • Make sure you have Redis running
  • In a first terminal window, start one or more instances of the RQ worker. For this you have to use the command rq worker microblog-tasks
  • In a second terminal window, start the Flask application with flask run (remember to set FLASK_APP first)

Progress Notifications

To wrap up this feature I want to inform the user when a background task is running, including a percentage of completion. In looking through the Bootstrap component options, I decided to use an alert below the navigation bar for this. Alerts are these color horizontal bars that display information to the user. The blue alert boxes are what I'm using to render flashed messages. Now I'm going to add a green one to show progress status. Below you can see how that is going to look:

Progress Alert

app/templates/base.html: Export progress alert in base template.

...
{% block content %}
    <div class="container">
        {% if current_user.is_authenticated %}
        {% with tasks = current_user.get_tasks_in_progress() %}
        {% if tasks %}
            {% for task in tasks %}
            <div class="alert alert-success" role="alert">
                {{ task.description }}
                <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
            </div>
            {% endfor %}
        {% endif %}
        {% endwith %}
        {% endif %}
        ...
{% endblock %}
...

The method to render the task alerts is almost identical to the flashed messages. The outer conditional skips all the alert related markup when the user is not logged in. For logged in users, I get the currently in-progress task list by calling the get_tasks_in_progress() method I created earlier. In the current version of the application I will only get one result at the most, since I don't allow more than one active export at a time, but in the future I may want to support other types of tasks that can coexist, so writing this in a generic way could save me time later.

For each task I write an alert element to the page. The color of the alert is controlled with the second CSS style, which in this case is alert-success, while in the case of the flashed messages was alert-info. The Bootstrap documentation includes the details on the HTML structure for the alerts. The text of the alert includes the description field stored in the Task model, followed by the completion percentage.

The percentage is wrapped in a <span> element that has a id attribute. The reason for this is that I'm going to refresh the percentage from JavaScript when notifications are received. The id that I'm using for a given task is constructed as the task id with -progress appended at the end. When a notification arrives, it will contain the task id, so I can easily locate the correct <span> element to update with a selector for #<task.id>-progress.

If you try the application at this point, you are going to see "static" progress updates, each time you navigate to a new page. You will notice that after you start an export task, you can freely navigate to different pages of the application, and the state of the running task is always recalled.

To prepare for applying dynamic updates to the percentage <span> elements, I'm going to write a little helper function in the JavaScript side:

app/templates/base.html: Helper function to dynamically update task progress.

...
{% block scripts %}
    ...
    <script>
        ...
        function set_task_progress(task_id, progress) {
            $('#' + task_id + '-progress').text(progress);
        }
    </script>
    ...
{% endblock %}

This function takes a task id and a progress value, and uses jQuery to locate the <span> element for this task and write the new progress as its contents. There is really no need to verify if the element exists on the page, because jQuery will do nothing if no elements are located with the given selector.

The notifications are already arriving to the browser because the _set_task_progress() function in app/tasks.py calls add_notification() each time the progress is updated. If you are confused about how these notifications could be reaching the browser without me having to do anything, it's really because in Chapter 21 I was wise to implement the notifications feature in a completely generic way. Any notifications that are added through the add_notification() method will be seen by the browser when it periodically asks the server for notification updates.

But the JavaScript code that processes these notifications only recognizes those that have a unread_message_count name, and ignores the rest. What I need to do now is expand that function to also handle task_progress notifications by calling the set_task_progress() function I defined above. Here is an updated version of the loop that processes notifications from JavaScript:

app/templates/base.html: Notification handler.

                        for (var i = 0; i < notifications.length; i++) {
                            switch (notifications[i].name) {
                                case 'unread_message_count':
                                    set_message_count(notifications[i].data);
                                    break;
                                case 'task_progress':
                                    set_task_progress(
                                        notifications[i].data.task_id,
                                        notifications[i].data.progress);
                                    break;
                            }
                            since = notifications[i].timestamp;
                        }

Now that I need to handle two different notifications, I decided to replace the if statement that checked for the unread_message_count notification name with a switch statement that contains one section for each of the notifications I now need to support. If you are not very familiar with the "C" family of languages you may not have seen switch statements before. These provide a convenient syntax that replaces a long chain of if/elseif statements. This is nice because as I need to support more notifications, I can simply keep adding them as additional case blocks.

If you recall, the data that the RQ task attaches to the task_progress notification is a dictionary with two elements, task_id and progress, which are the two arguments that I need to use to invoke set_task_progress().

If you run the application now, the progress indicator in the green alert box is going to refresh every 10 seconds, as notifications are delivered to the client.

Because I have introduced new translatable strings in this chapter, the translation files need to be updated. If you are maintaining a non-English language file, you need to use Flask-Babel to refresh your translation files and then add new translations:

(venv) $ flask translate update

If you are using the Spanish translation, then I have done the translation work for you, so you can just extract the app/translations/es/LC_MESSAGES/messages.po files from the download package for this chapter and add it to your project.

After the translations are done, you have to compile the translation files:

(venv) $ flask translate compile

Deployment Considerations

To complete this chapter, I want to discuss how the deployment of the application changes. To support background tasks I have added two new components to the stack, a Redis server, and one or more RQ workers. Obviously these need to be included in your deployment strategy, so I'm going to briefly go over the different deployment options I covered in previous chapters and how they are affected by these changes.

Deployment on a Linux Server

If you are running your application on a Linux server, adding Redis should be as simple as installing this package from your operating system. For Ubuntu Linux, you have to run sudo apt-get install redis-server.

To run the RQ worker process, you can follow the "Setting Up Gunicorn and Supervisor" section in Chapter 17 to create a second Supervisor configuration in which you run rq worker microblog-tasks instead of gunicorn. If you want to run more than one worker (and you probably should for production), you can use Supervisor's numprocs directive to indicate how many instances you want to have running concurrently.

Deployment on Heroku

To deploy the application on Heroku you are going to need to add a Redis service to your account. This is similar to the process that I used to add the Postgres database. Redis also has a free tier, which can be added with the following command:

$ heroku addons:create heroku-redis:hobby-dev

The access URL for your new redis service is going to be added to your Heroku environment as a REDIS_URL variable, which is exactly what the application expects.

The free plan in Heroku allows one web dyno and one worker dyno, so you can host a single rq worker along with your application without incurring into any expenses. For this you will need to declare the worker in a separate line in your procfile:

web: flask db upgrade; flask translate compile; gunicorn microblog:app
worker: rq worker -u $REDIS_URL microblog-tasks

After you deploy with these changes, you can start the worker with the following command:

$ heroku ps:scale worker=1

Deployment on Docker

If you are deploying the application to Docker containers, then you first need to create a Redis container. For this you can use one of the official Redis images from the Docker registry:

$ docker run --name redis -d -p 6379:6379 redis:3-alpine

When you run your application you will need to link the redis container and set the REDIS_URL environment variable, similar to how the MySQL container is linked. Here is a complete command to start the application including a redis link:

$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    microblog:latest

Finally, you will need to run one or more containers for the RQ workers. Because the workers are based on the same code as the main application, you can use the same container image you use for your application, overriding the start up command so that the worker is started instead of the web application. Here is an example docker run command that starts a worker:

$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --link mysql:dbserver --link redis:redis-server \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
    -e REDIS_URL=redis://redis-server:6379/0 \
    --entrypoint venv/bin/rq \
    microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks

Overriding the default start up command of a Docker image is a bit tricky because the command needs to be given in two parts. The --entrypoint argument takes just the executable name, but the arguments (if any) need to be given after the image and tag, at the end of the command line. Note that rq needs to be given as venv/bin/rq so that it works without having the virtual environment activated.

179 comments

  • #151 Marcus Smith said 2020-06-12T18:28:05Z

    Hello Miguel!

    Very good tutorial! I have not quite made it to this part of it yet, but I'm already developing some flask apps and have even deployed some. For now, most frequently my app receives a request to basically do some stuff that should take some time, but returning OK response immediately is preferred.

    I basically want to receive request, get params from it, start the task with these params asynchronously and immediately return OK response. Typically, these tasks send their own HTTP requests. Right now, on development server, I'm just submitting those tasks to ThreadPoolExecutor and using returned futures.

    Not sure how this will hold up in production environment, since all threads belong to the view function process which in that case is not controlled by myself. For all I know, the process might be stopped as soon as view function returns, or it may crash while handling some other request. And those tasks will be left half-finished.

    For now these tasks are fairly simple, so I'm not quite willing just yet to introduce things like task queues described in this tutorial. Maybe I'm getting something wrong (WSGI stuff confuses me somewhat), maybe I worry too much.

    Anyway, would be glad to hear your input on this,

  • #152 Miguel Grinberg said 2020-06-13T19:56:55Z

    @Michael: what is the problem with your RQ worker? I can't really diagnose the issue without knowing what the issue is. You can also post the question on Stack Overflow if you prefer.

  • #153 Miguel Grinberg said 2020-06-15T15:49:56Z

    @Marcus: when you are using a thread executor the thread pool belongs to the executor. If you create the executor in the global scope, then you can launch a job and return from your view function, and the job will continue to run in its thread.

  • #154 Artur said 2020-06-16T17:29:34Z

    Hello Miguel, maybe my question is obvious but why you didn't run export in Thread like a mailing functionality? What is main advantage of RQ queue in comparison to Thread?

  • #155 Miguel Grinberg said 2020-06-17T10:56:42Z

    @Artur: a job queue runs a pool of workers, so there is a limit to the amount of background work that can happen at any given time. Starting a background thread every time you need to job done can lead to problems if you start too many. Also a job queue provides mechanisms to pass progress and results back to the calling application, while a background thread needs this functionality implemented by the application.

  • #156 David said 2020-06-27T12:08:18Z

    Hi, Miguel;

    I appreciate your tutorials and contribution to the community. I have two questions--I'll ask the second in separate thread for clarity.

    I have a background task that I schedule with APscheduler, which I set up to run daily as an asynchronous task, loading it in with init.py I only want to run the task scheduling once, though the task will run every day. Is this bad practice? I'm using this on a production server with Gunicorn and Nginx. My concern is that the task generation will occur multiple times.

  • #157 David said 2020-06-27T12:17:48Z

    Question 2--

    I set up a simple queue system, as shown below--this was before I found this tutorial, which I didn't find in the book. All I needed from the queue was to limit running a certain type of cup and memory-intensive processes to one. Can you see any pros/cons from this?

    settings.py

    from multiprocessing import Queue

    ....

    def init(): global crawl_queue crawl_queue = Queue()

    init.py

    ...

    def process_crawling_queue(): ... while True: size = settings.crawl_queue.qsize() if size: p_name, *args = settings.crawl_queue.get() if p_name == '': with app.app_context(): ... elif p_name == "": with app.app_context(): ... else: # report errors (task not recognized) ... time.sleep(3)

  • #158 Miguel Grinberg said 2020-06-27T22:15:33Z

    @David: on the first question, my opinion is that APScheduler is a bad choice to run in a Flask app. When you scale your app to multiple workers you are going to have to do crazy things to prevent multiple copies of your scheduler from running. I actually have an article in the works regarding scheduled jobs that should be out in about 2-3 weeks. Keep an eye on the blog!

    On the 2nd question, I assume you meant to use the queue to process only one job at a time? When you scale your server to multiple workers you are also going to have issues with this. I see this often, people try to put everything inside the Flask process. I think a distributed design would work better. You can use Celery or RedisQueue and have a single task worker instead.

  • #159 Tom Robinson said 2020-07-26T16:57:58Z

    Hi Miguel,

    I am struggling with deployment to heroku after the changes from the past few chapters. Heroku logs is telling me the following:

    2020-07-26T16:31:27.066054+00:00 app[web.1]: sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) relation "message" does not exist 2020-07-26T16:31:27.066054+00:00 app[web.1]: LINE 3: FROM message 2020-07-26T16:31:27.066054+00:00 app[web.1]: ^ 2020-07-26T16:31:27.066054+00:00 app[web.1]: 2020-07-26T16:31:27.066055+00:00 app[web.1]: [SQL: SELECT count(*) AS count_1 2020-07-26T16:31:27.066055+00:00 app[web.1]: FROM (SELECT message.id AS message_id, message.sender_id AS message_sender_id, message.recipient_id AS message_recipient_id, message.body AS message_body, message.timestamp AS message_timestamp 2020-07-26T16:31:27.066056+00:00 app[web.1]: FROM message 2020-07-26T16:31:27.066056+00:00 app[web.1]: WHERE %(param_1)s = message.recipient_id AND message.timestamp > %(timestamp_1)s) AS anon_1] 2020-07-26T16:31:27.066056+00:00 app[web.1]: [parameters: {'param_1': 1, 'timestamp_1': datetime.datetime(1900, 1, 1, 0, 0)}] 2020-07-26T16:31:27.066057+00:00 app[web.1]: (Background on this error at: http://sqlalche.me/e/f405)

    I am guessing this is something to do with the database not upgrading correctly, but 'flask db upgrade' runs with the dyno.

    Do you have any insight on this?

  • #160 Miguel Grinberg said 2020-07-27T14:48:31Z

    @Tom: the "message" table does not exist in your Heroku Postgres database. Any chance you forgot to commit a migration file to your repo? If it is not committed, then Heroku will not see it when you push an upgrade.

  • #161 Edmund said 2020-09-06T15:55:20Z

    Wonderful stuff as ever Miguel. I wonder if you could elaborate on your #18 message. I think I am in a similar position, where I want to write a script that accesses the same db in the same way as the rest of the application (and will eventually send out a weekly reminder email). I'll want to run it on heroku later in a scheduled way, but I'm struggling to work out where to put the .py script and what I need to import/include to make it runable. I've tried just creating a reminder.py file in the app folder (next to routes.py etc). But when I try to import "from app.models import *" and run the file in pycharm it complains straight away about that line 1 import. With errors such as "ModuleNotFoundError: No module named 'email.utils'; 'email' is not a package" and "ModuleNotFoundError: No module named 'SocketServer'" even though I've not written anything to do with emails yet. Just a simple select/query statement to get a value from the db (but it fails at the import).

    I wonder if you can point me in the right direction to write a scheduled reminder script alongside the application basically. Thank you!

  • #162 Miguel Grinberg said 2020-09-06T23:01:01Z

  • #163 Jurg said 2020-09-10T14:15:07Z

    Hi Miguel,

    great tutorial - thank you! I have an idea - probably it would be more interesting exercise to render all the posts into a pdf report. What would be the proecess and the lightest python libraries (say, some use xhtml2pdf, https://plotly.com/python/v3/pdf-reports/)?

    regards, Jurg

  • #164 Miguel Grinberg said 2020-09-10T14:33:21Z

    @Jurg: I'm not very familiar with the PDF generator packages for Python, but any of them should work. You can also shell out to an external html to pdf converter such as pandoc if you prefer.

  • #165 Androgen said 2020-09-14T08:58:00Z

    Hi Miguel,

    I'm having the same problem as some of the other commenters that the REDIS_URL is not being registered properly (it's properly set, just not being read), and the connection is being made to localhost instead. This is for Heroku. I noticed that your suggested procfile code differs from what's in your Github repo: "worker: rq worker -u $REDIS_URL microblog-tasks" vs. "worker: rq worker microblog-tasks". Which one of these is the correct one?

  • #166 Miguel Grinberg said 2020-09-14T14:52:53Z

    @Androgen: if you use a non-default Redis URL, then you have to add the -u $REDIS_URL option. That is required for Heroku.

  • #167 Kieron Spearing said 2020-09-23T15:59:02Z

    Hey miguel

    I am recieving the following error when running the executing tasks part:

    15:28:03 ←[31mWorker rq:worker:ee3c7f7c1a1c44a98a6bebe6160bd638: found an unhandled exception, quitting...←[39;49;00m Traceback (most recent call last): File "c:\users\diana\pythonworkspace\mega\venv\lib\site-packages\rq\worker.py", line 549, in work self.execute_job(job, queue) File "c:\users\diana\pythonworkspace\mega\venv\lib\site-packages\rq\worker.py", line 773, in execute_job self.fork_work_horse(job, queue) File "c:\users\diana\pythonworkspace\mega\venv\lib\site-packages\rq\worker.py", line 698, in fork_work_horse child_pid = os.fork() AttributeError: module 'os' has no attribute 'fork'

    I have installed the Windows subsystem for Linux with ubuntu on my system.

    After receiving this i installed cygwin also to try see if the issue was in that and not quite sure why I am receiving this

  • #168 Miguel Grinberg said 2020-09-23T16:07:34Z

    @Kieron: yes, this is because RQ does not work under Windows, this is noted above. To use RQ on Windows you have to run it under the WSL or Cygwin.

  • #169 Kieron Spearing said 2020-09-23T16:32:10Z

    Having set up the cygwin would i need to run it in a git bash terminal instead of in the vscode or how would you run it if i have installed either of these

  • #170 Miguel Grinberg said 2020-09-23T18:43:40Z

    @Kieron: both Cygwin and WSL come with bash, so you'll run it there, or in any other Unix shell you like and install to replace bash, such as zsh, etc. Gitbash is something different, I'm not very familiar with it so I don't know how far it goes in emulating a Unix environment. Probably not far enough.

  • #171 Yasmin Amran said 2020-10-04T15:47:21Z

    if im starting a brand new project do you recommend to start it with quart? is it possible to deploy quart on heroku ? what do you recommend to do in order to avoid 39 secobds tineout error on heroku ?

  • #172 Miguel Grinberg said 2020-10-04T16:09:04Z

    @Yasmin: Why did you get the idea that I would recommend Quart? I have never used it for anything other than little tests, so no, I'm not in a position to recommend Quart over Flask. Also, async development is very hard to do right, so I wouldn't recommend you take that route not only with Quart but with any async framework unless you are experienced in async development.

    Deploying Quart to Heroku should be possible, but I have never tried it myself.

    To avoid the Heroku timeout I recommend that you don't write endpoints that take more than 30 seconds to run. If you have long requests then do them in a background worker using Celery, RQ or similar.

  • #173 Phil said 2020-10-11T15:14:13Z

    Hey Miguel,

    Your mega-tutorial has helped me a lot in my understanding and ultimately contributed to the adoption of web based services in the company I work for. Thank you!

    I am a mechanical/structural engineer in a large company and develop services for fellow engineers. We often deal with data pre- and post-processing activities around commercial off the shelf applications (numerical simulation applications to simulate structural response, stress, deformation, ...). These pre- and post-processing activities often have run-times in excess of any well-behaved response- and request-cycles and are implemented as individual (micro-) services are or are part of a workflow in a service. The numerical simulation can also last several hours, sometimes days, and is also part of various workflows (so post-processing depends on successful completion of the simulation task).

    So, we have the need for task queues all over the place. Spinning up multiple redis-servers as dedicated message brokers for associated services sounds a bit expensive... If I accept this as single point of failure, can I use one redis-server and manage different service task queues via

    a) unique queue names and/or b) the database SELECT index (0-15)?

    Do you have any experience to share? Is there a preferred solution (a over b or vice versa)? Is there an alternative or better design?

  • #174 Miguel Grinberg said 2020-10-11T18:45:35Z

    @Phil: Using a single redis server (ideally built as a cluster for high availability) should be fine, as long as the queue names are different.

  • #175 David said 2020-10-20T23:46:48Z

    Hi Miguel,

    Thanks a lot for this tutorial.

    I intend to implement sessions within this app so user sessions can timeout after a period of inactivity. Do you have any article that addresses this specifically?

Leave a Comment