How To Mock Celery Tasks With Pytest (Step-by-Step Guide with Examples)
Asynchronous task processing has become a cornerstone for building efficient and scalable applications in today’s fast-paced world.
Users perform actions and expect results near real-time. This can be efficiently handled with distributed computing and asynchronous task processing.
At the heart of this asynchronous revolution lies Celery, a powerful and popular distributed task queue framework for Python.
Celery empowers you to offload time-consuming tasks to the background, allowing applications to remain responsive and performant while handling resource-intensive operations.
While Celery simplifies the implementation of asynchronous workflows, testing them can be challenging.
With Unit Testing, you often encounter the need to control, isolate, and validate individual components of your application.
Celery’s backend uses a message broker such as RabbitMQ or Redis thus introducing infrastructure coupling and making it hard to test tasks in isolation.
In this article, we’ll explore the challenges associated with testing Celery tasks the traditional way.
We’ll also introduce and apply the concept of mocking — a powerful technique that allows you to simulate Celery task execution in a controlled environment.
As we progress through this article, you’ll discover how to create reliable and repeatable tests for your Celery tasks, ensuring that your applications function flawlessly in real-world scenarios.
So let’s get started.
What Is Celery?
If you’re unfamiliar with Celery and Mocking please read through the next sections. Otherwise, feel free to skip and jump straight to the example code.
Celery is a versatile and widely used distributed task queue framework for handling asynchronous tasks efficiently.
These tasks, often time-consuming or resource-intensive, can be pushed to the background, allowing the main application to remain responsive and agile.
Celery’s strength lies in its ability to distribute tasks across multiple workers, making it suitable for handling a variety of workloads in parallel.
With features like task prioritization, retries, and scheduling, Celery is great for building scalable and robust applications that demand seamless asynchronous processing.
From the docs here’s a basic example1
2
3
4
5
6
7from celery import Celery
app = Celery('hello', broker='amqp://guest@localhost//')
def hello():
return 'hello world'
The above code creates a hello
Celery App and Task. If you’re familiar with Flask or other frameworks, it’s something along those lines.
Celery also uses a message broker, RabbitMQ and Redis are the 2 most popular used with Celery but you can use AWS SQS, GCP PubSub and so on.
The hello
task returns the hello world
message.
You call the hello()
task with the delay()
method to execute the task.
There are several methods you can use to poll the status for example get()
and status()
.
We’ll look into these in more detail with our example code.
What Is Mocking Or Patching?
Mocking or patching is a technique used to emulate or substitute certain components of the system, often to create controlled and predictable testing environments.
Mocking allows you to replace real, external dependencies like databases, external APIs, or, in this case, Celery tasks, with simulated versions.
It involves creating simulated or “mock” objects to stand in for real components of a system.
By substituting real components with mock versions, you can simulate various scenarios, test edge cases, and verify the correctness of your code without relying on external systems.
For Celery task testing with Pytest, this means substituting the actual calling of Celery tasks with mock implementations.
This technique ensures that tests remain consistent and reliable, helping identify and resolve issues in the application’s asynchronous workflows.
We covered mocking at great length in our article on Pytest Mocking and Pytest Monkeypatching.
What You’ll Learn
By the end of this article, you should:
- Understand how to use Celery to perform asynchronous task execution.
- Learn how to unit test Celery Tasks the traditional way.
- Understand the challenges of testing Celery Tasks.
- Be able to mock and simulate testing of Celery Tasks.
- Unit test Async Celery Tasks.
Excited? Let’s get into it.
Project Set Up
Prerequisites
To achieve the above objectives, the following is recommended:
- Basic knowledge of Python and Pytest (must have)
- Basics of Mocking (nice to have)
Getting Started
Here’s our repo structure
To get started, clone the repo here, or you can create your own repo by creating a folder and running git init
to initialise it.
In this project, we’ll be using Python 3.11.5.1
pip install -r requirements.txt
Most important packages being pytest
, pytest-celery
(to activate the Celery testing module) and pytest-mock
(to allow us to use Unittest Mock
).
Basic Celery Task Example
Let’s start with a very simple example.
At the root of the repo, we have a tasks.py
Python file that includes the Celery
app and tasks that we define.
tasks.py
1
2
3
4
5
6
7
8from time import sleep
from celery import Celery, shared_task
app = Celery("tasks", backend="rpc://", broker="pyamqp://guest@localhost//")
def reverse_string(my_string: str):
return my_string[::-1]
We have a simple Python task that reverses a string.
In line with the Celery Getting Started tutorials, we’ll use RabbitMQ as a message queue broker on our local machine.
We have defined the broker=pyamqp://guest@localhost//
value to tell Celery to use the local RabbitMQ container as its broker.
Start RabbitMQ Locally
To download and run the official RabbitMQ Docker image, from the terminal run1
docker run -d -p 5672:5672 rabbitmq
You should get something like this.
Run Celery Task
To run the Celery tasks, we can use the delay()
method.
Once your RabbitMQ (or another broker) is running, start the Celery Server using1
celery -A tasks worker --loglevel=info
tasks
is the file where we’ve defined our Celery App and tasks. You should see something like this
with the message celery@Erics-MacBook-Pro.local ready.
.
Open up a new terminal and start a Python shell.
Here we’ve called the reverse_string.delay(“Eric”)
method and can see an Async object reference. Meanwhile, in the Celery Logs, we can see the result
You can access the result using the get()
method.
There are many other commands that allow to you do so much more, but for now, let’s move on to the testing.
Unit Testing Celery Tasks
Let’s look at how we can execute our tests using Pytest.
tests/test_task.py
1
2
3
4
5
6
7
8import celery
import pytest
from tasks import reverse_string
## Running Celery Task with Backend and Broker (RabbitMQ)
def test_reverse_string(celery_app, celery_worker):
assert reverse_string.delay("hello").get(timeout=10) == "olleh"
We’ve used the pytest.mark.celery()
marker to pass specific config to our unit test (use rpc
backend).
Let’s run this.
We can see that our test timed out. This is because Celery expects a backend broker config and can’t find one.
Let’s fire up the Rabbit MQ container using1
docker run -d -p 5672:5672 rabbitmq
Hmm same error, why’s that?
It’s because we are missing the key Celery App config.
Now navigate to your conftest.py
file and add the below.
tests/conftest.py
1
2
3
4
5import pytest
def celery_config():
return {"broker_url": "pyamqp://", "result_backend": "rpc://"}
With this config, we tell Pytest to use the RabbitMQ broker with an rpc
backend.
If you’re unfamiliar with conftest.py
, it’s a file that holds common fixtures or methods that you wish to share across your test suite.
This article on Pytest Conftest has you covered.
Now let’s try to run it.
Voila! works.
While this is good enough, you can see how a simple task test took 1.5 seconds to run. Not to mention it requires you to connect to an actual broker.
Although not a problem for local development, it can be cumbersome to do on a CI/CD pipeline requiring access and unnecessary infrastructure.
If you’d like to learn more about running Pytest with GitHub Actions, this article covers all you need to know about CI/CD Integration.
How can we remove this dependency on external infrastructure?
The answer lies in Mocking, one of testing’s most powerful features.
Mocking Celery Tasks
We spoke about mocking and patching briefly above, so how do you use it in this context? Let’s take a look.
tests/test_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import pytest
from tasks import reverse_string
def mocked_reverse_string(mocker):
# Mock the Celery task
return mocker.patch("tasks.reverse_string.delay", return_value="olleh")
def test_reverse_string_mocked(mocked_reverse_string):
# Call the Celery task
result = reverse_string.delay("hello")
# Check if the Celery task was called once with the correct arguments
mocked_reverse_string.assert_called_once_with("hello")
# Check the result of the Celery task
assert result == "olleh"
Let’s break down the above code.
First, we create a simple Pytest fixture mocked_reverse_string
that patches the delay
method which is the actual method called to submit a task to Celery.
If you need a refresher these articles on Pytest Fixtures and Fixture Scopes are a good read.
This patch returns the expected value i.e. olleh
in this case.
We pass this fixture to our unit test and then call the actual Celery task with its argument “hello” - reverse_string.delay(“hello”)
.
Now it’s time to assert.
First, we want to assert that the delay
method was called with the argument “hello”.
Then we want to assert that the result is correct (i.e. we get the patched result).
Let’s run this.
Nice! And it only took 0.22 seconds.
Unit Tests for Celery Tasks With Async Functions
Having looked at a basic example of how to test a Celery Task, let’s get a bit more advanced.
Celery takes care of performing async execution of our normal Python functions, what if we already have Async functions and want to include them as Celery tasks?
And testing those tasks were called?
Consider an example,
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def add(self, x, y):
# Your actual async function
async def async_add(x, y):
return x + y
loop = asyncio.get_event_loop()
# If there's no running loop, we will create a new one and close it after the task
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(async_add(x, y))
finally:
loop.close()
else:
result = loop.run_until_complete(async_add(x, y))
return result
Our tasks
file contains the add
task which contains an async function async_add
wrapped within.
We get the event loop and then run it within the event loop until complete. If there’s no look we create one.
Firing up our Celery worker and running1
print(add.delay(5, 9).get())
we get the result 14.
This proves that our task works with Celery.
Unit Testing Async Tasks — Mocking
Given a working async task, it’s time to Unit Test it.
But as we saw before, we don’t want to have to maintain a message broker or any external infrastructure.
So, what’s the solution?
Just like normal synchronous functions, async function tasks can also be mocked.
Here’s an example.1
2
3
4
5
6
7
8
9
10
11
12
13def test_add(mocker):
# Arrange
x = 5
y = 7
# Mock the Celery task's apply_async method
mocked_add = mocker.patch("tasks.add.apply_async", autospec=True)
# Act
result = add.apply_async(args=[x, y])
# Assert
mocked_add.assert_called_once_with(args=[x, y])
Here we mock the Celery task’s apply_async method and assert that this method has been called with the expected arguments x
and y
.
Success!
Conclusion
In this article, we covered a few key concepts related to Celery async task execution and mocking.
First, we looked at what Celery is and how to write a simple task that reverses a string.
Then we tested it and learnt how to mock it to remove any external dependencies e.g. external message queue brokers.
Next, we did the same for async functions and tasks including mocking that the required method was called with our arguments.
Mocking allows for faster test execution and grants control over the task’s behaviour, making it feasible to simulate different scenarios, be it success, failure, or other edge cases.
With this knowledge, you can harness the power of Celery to scale and optimize your applications with solid and isolated testing.
This is incredibly powerful and will help you build and validate Celery tasks quickly without the need for external message brokers.
If you have ideas for improvement or like me to cover anything specific, please send me a message via Twitter, GitHub or Email.
Till the next time… Cheers!