Celery for Task Management with Flask and SQS

Zachary Smith
Level Up Coding
Published in
6 min readJul 21, 2020

--

Assumed background knowledge

This article assumes the reader has familiarity with Python, Flask, Celery, and AWS SQS.

Introduction

The fundamental thing to grasp when building a Flask app that utilizes Celery for asynchronous task management is that there are really three parts to consider, outside of the queue and result backends. These are (1) the Flask instance, which is your web or micro-service frontend, (2) the Celery instance, which feeds tasks to the queue, and (3) the Celery worker, which pulls tasks off the queue and completes the work. The Flask and Celery instances are deployed together and work in tandem at the interface of the application. The Celery worker is deployed separately and works effectively independent from the instances.

At first glance setting up an application for using these three components appears very simple. However, the complication arises when attempting to implement the Flask instance and Celery instance using the Flask application factory pattern, because the approach causes a circular import issue.

The objective of this article of to:

  1. clarify how to initialize these three parts of the Flask+Celery service
  2. explain how to containerize the pieces of these services for deployment to a production environment
  3. point out a some notes on best practices

Setup the 3 components of a Flask+Celery service

To avoid circular import we point the `factories.celery_instance` and `tasks.data_tasks` to import a Celery instance placeholder. The placeholder will receive its configuration from the instance factories at runtime.

Assume that to implement our solution we set up a project similar in structure to below:

application.py
worker.py
celery_holder.py
factories/
flask_instance.py
celery_instance.py
blueprints/
controller.py
tasks/
long_tasks.py

Here `application.py` is our application that initializes the Flask instance and Celery instance to provide the part of the web/micro-service that we interact with. The Celery placeholder is provided in `celery_holder.py`, and the Celery worker is initialized through `worker.py`. The application factories are provided in `factories/`, the Flask blueprint in `blueprints/`, and the Celery tasks in `tasks/`. Now the import pattern will look something like this:

Showing the code examples should make the process clearer. Let's start with the bottom of the import diagram by showing the code for the Celery placeholder. This placeholder give the task file and the Celery factory file a Celery instance that can be imported and referenced in decorators and during the Celery class extension respectively.

from celery import Celery

celery = Celery(__name__, include=['tasks.long_tasks'])

Here we demonstrate a Celery task.

import time
from celery_holder import celery


@celery.task
def celery_long_task(duration):
for i in range(duration):
print("Working... {}/{}".format(i + 1, duration))
time.sleep(2)
if i == duration - 1:
print('Completed work on {}'.format(duration))

That task is called within an endpoint defined in a Flask blueprint.

from flask import Blueprint

from tasks.long_tasks import celery_long_task


sqs = Blueprint('sqs', __name__)


@sqs.route('/celery/<int:duration>', methods=['GET'])
def add_celery_task(duration):
celery_long_task.delay(duration)
return 'Task queued'

Then we come to the factory that creates the Flask instance with all the necessary configuration specified.

from flask import Flask
import os
import logging

from blueprints.controller import sqs


def create_app() -> Flask:
app = Flask(__name__)

# Always use dev config
app.config.from_object('config_dev')

# Set logging
log_file = app.config['LOG_LOC'] + app.config['LOG_FILE']
logging.basicConfig(
level=app.config['LOG_LEVEL'],
format=('%(levelname)s %(asctime)s %(name)s '
'Lrn %(threadName)s: %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S',
filename=log_file
)

# Register blueprints
app.register_blueprint(sqs, url_prefix='/sqs')

return app

Then, the factory that extends the Celery class to give it the Flask app context.

import celery_holder# Imported for type hinting
from flask import Flask
from celery import Celery
def configure_celery(app: Flask) -> Celery:
"""Configure celery instance using config from Flask app"""
TaskBase = celery_holder.celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery_holder.celery.conf.update(app.config)
celery_holder.celery.Task = ContextTask

return celery_holder.celery

Finally, we come to the two files that are called to actually run our application and our worker.

from factories.flask_instance import create_app
from factories.celery_instance import configure_celery

# Imported for type hinting
from flask import Flask
from celery import Celery


def create_full_app() -> Flask:
app: Flask = create_app()
cel_app: Celery = configure_celery(app)
return app

and our worker.

from factories.flask_instance import create_app
from factories.celery_instance import configure_celery

# Imported for type hinting
from flask import Flask
from celery import Celery

app: Flask = create_app()
celery: Celery = configure_celery(app)

To run the Flask application set the FLASK_APP global variable to application:create_full_app().

To run the Celery worker point to worker.celery in the --app=* parameter of the "celery worker" command.

Configuration

The minimum configuration that is required for Celery to implement SQS as the queue for the background task message communication. Really all you have to do to get Celery to recognize SQS as the queue is to specify the correct BROKER_URL, and pass the name of the queue to CELERY_DEFAULT_QUEUE.

SQS_NAME = 'test'# Celery configuration
BROKER_URL = 'sqs://'
BROKER_TRANSPORT_OPTIONS = {
'region': 'us-east-1',
'polling_interval': 5, # number of sec to sleep between polls
'wait_time_seconds': 5
}
CELERY_DEFAULT_QUEUE = SQS_NAME
CELERY_ENABLE_REMOTE_CONTROL = False
CELERY_SEND_EVENTS = False

Containerizing our application and worker

Taking advantage of the independent scalability that a Celery background worker affords means creating two Docker images - an application image and a worker image. Then each image is able to be deployed on its own server. This is accomplished through the creation of a `docker-compose` file.

version: "3.3"

services:
web:
build:
context: .
args:
blduser: notused
image: application
# no user here because no need to run as something other than root user
command: [flask, run, --host=0.0.0.0]
ports:
- 8000:5000
# volumes below allows the developer to pass AWS credentials for local development
# before container is moved to a server that sits behind IAM roles
volumes:
- ~/.aws/credentials:/home/celeryworker/.aws/credentials:ro

worker:
build:
context: .
args:
blduser: celeryworker
image: worker
# user below sets the user of the container - removes RuntimeWarning for Celery
user: celeryworker
command: [celery, worker, --app=worker.celery, --concurrency=1, --loglevel=info]
# volumes below allows the developer to pass AWS credentials for local development
# before container is moved to a server that sits behind IAM roles
volumes:
- ~/.aws/credentials:/home/celeryworker/.aws/credentials:ro

In addition to making it easy to create multiple containers, the docker-compose file offers additional advantages. Such as:

  • Able to specify which Docker file to use. Even the ability to create separate Docker files per service.
  • Able to specify variables the docker-compose file automatically picks up in a `.env` file.
  • Makes it easy to pass local ~/.aws/credentials to the services. Helping to avoid accidentally checking credentials into a repository.

Note that in this instance the Docker file is used to specify the Flask global variables necessary to run the Flask application - i.e. FLASK_APP & FLASK_ENV.

General notes

By default Celery will delete a task off of the queue as soon as the queue message is picked up by the worker. This is called “early acknowledgement”, and generally is not our desired behavior. Assuming, instead we want Celery to only delete the task off of the queue at completion or upon an error, we have to change the worker to exhibit “late acknowledgement” by setting the Celery parameter `task_acts_late` equal to true. This requires careful crafting of the task to be idempotent so that the task is only completed once even if the task is picked up by multiple workers.

See https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b for details.

According to https://dzone.com/articles/using-sqs-with-celery there are two Celery parameters to be mindful of when utilizing SQS as the queue backend. These are worker_send_task_events and worker_enable_remote_control. When left to their default (true), Celery may create a new queue for each node in AWS, which is undesired behavior. Therefore, both of these parameters should be set equal to false.

Code Repo

The code presented in this article can be found on my Github at https://github.com/clashofphish/learnFlask/tree/master/lrnflsk

References

--

--

Engineer and Data Scientist. Creative and Maker. Straddling the world of artist and engineer.